This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch add-for-test
in repository https://gitbox.apache.org/repos/asf/eventmesh.git

commit f3eacde4edfa71af9ad31e1c1207c0b1313dee7a
Author: xwm1992 <[email protected]>
AuthorDate: Mon Aug 19 23:07:10 2024 +0800

    add for test
---
 eventmesh-admin-server/bin/start-admin.sh          |  51 +++++------
 eventmesh-admin-server/build.gradle                |   2 +
 eventmesh-admin-server/conf/application.yaml       |   8 +-
 eventmesh-admin-server/conf/eventmesh.sql          |   2 +-
 .../conf/mapper/EventMeshVerifyMapper.xml          |   5 +-
 .../eventmesh/admin/server/web/HttpServer.java     |  22 +++++
 .../admin/server/web/db/DBThreadPool.java          |  26 +++++-
 .../server/web/db/entity/EventMeshVerify.java      |   2 +
 .../web/handler/impl/FetchJobRequestHandler.java   |   2 +-
 ...fyHandler.java => ReportJobRequestHandler.java} |  37 +++++---
 .../web/handler/impl/ReportPositionHandler.java    |   1 +
 .../web/handler/impl/ReportVerifyHandler.java      |  50 ++++++++--
 .../server/web/service/job/JobInfoBizService.java  |  81 ++++++++++++++--
 .../service/position/impl/HttpPositionHandler.java |  57 ++++++++++++
 .../web/service/verify/VerifyBizService.java       |   1 +
 .../eventmesh/common/config/ConfigService.java     |  11 ++-
 .../config/connector/http}/HttpRetryConfig.java    |   2 +-
 .../config/connector/http}/HttpSinkConfig.java     |   2 +-
 .../config/connector/http}/HttpWebhookConfig.java  |   2 +-
 .../connector/http}/SinkConnectorConfig.java       |  11 +--
 .../connector/http/SourceConnectorConfig.java      |   6 +-
 .../apache/eventmesh/common/remote/JobState.java   |  51 +++++++++++
 .../eventmesh/common/remote/TransportType.java     |   1 +
 ...ortVerifyRequest.java => ReportJobRequest.java} |  14 +--
 .../common/remote/request/ReportVerifyRequest.java |   2 +
 .../apache/eventmesh/common/utils/JsonUtils.java   |  11 +++
 ...apache.eventmesh.common.remote.payload.IPayload |   1 +
 .../canal/sink/connector/CanalSinkConnector.java   |  24 ++++-
 .../source/connector/CanalSourceConnector.java     |   5 +-
 .../connector/http/sink/HttpSinkConnector.java     |   4 +-
 .../http/sink/data/HttpConnectRecord.java          |  40 ++++++--
 .../http/sink/handler/AbstractHttpSinkHandler.java |   4 +-
 .../http/sink/handler/HttpSinkHandler.java         |   8 +-
 .../sink/handler/impl/CommonHttpSinkHandler.java   |  64 +++++++------
 .../handler/impl/HttpSinkHandlerRetryWrapper.java  |   4 +-
 .../sink/handler/impl/WebhookHttpSinkHandler.java  |  12 +--
 .../connector/http/source/data/WebhookRequest.java |   3 +
 .../http/source/protocol/impl/CommonProtocol.java  |  31 ++++++-
 .../http/source/protocol/impl/GitHubProtocol.java  |   2 +-
 ...ventmesh.openconnect.api.ConnectorCreateService |  12 +--
 .../connector/http/sink/HttpSinkConnectorTest.java |   4 +-
 .../api/connector/SinkConnectorContext.java        |   4 +
 .../offsetmgmt/admin/AdminOffsetService.java       |   3 +
 .../offsetmgmt/api/data/DefaultKeyValue.java       |   5 +
 eventmesh-runtime-v2/build.gradle                  |   1 +
 .../runtime/connector/ConnectorRuntime.java        | 102 ++++++++++++++++-----
 46 files changed, 607 insertions(+), 186 deletions(-)

diff --git a/eventmesh-admin-server/bin/start-admin.sh 
b/eventmesh-admin-server/bin/start-admin.sh
index 93c364439..163303661 100644
--- a/eventmesh-admin-server/bin/start-admin.sh
+++ b/eventmesh-admin-server/bin/start-admin.sh
@@ -56,34 +56,34 @@ function extract_java_version {
 #}
 
 function get_pid {
-       local ppid=""
-       if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
-               ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
-               # If the process does not exist, it indicates that the previous 
process terminated abnormally.
+        local ppid=""
+        if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
+                ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
+                # If the process does not exist, it indicates that the 
previous process terminated abnormally.
     if [ ! -d /proc/$ppid ]; then
       # Remove the residual file.
       rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
       echo -e "ERROR\t EventMesh process had already terminated unexpectedly 
before, please check log output."
       ppid=""
     fi
-       else
-               if [[ $OS =~ Msys ]]; then
-                       # There is a Bug on Msys that may not be able to kill 
the identified process
-                       ppid=`jps -v | grep -i 
"org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v 
grep | awk -F ' ' {'print $1'}`
-               elif [[ $OS =~ Darwin ]]; then
-                       # Known problem: grep Java may not be able to 
accurately identify Java processes
-                       ppid=$(/bin/ps -o user,pid,command | grep "java" | grep 
-i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" 
|awk -F ' ' {'print $2'})
-               else
-                 if [ $DOCKER ]; then
-                   # No need to exclude root user in Docker containers.
-                   ppid=$(ps -C java -o user,pid,command --cols 99999 | grep 
-w $EVENTMESH_ADMIN_HOME | grep -i 
"org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print 
$2'})
-                 else
+        else
+                if [[ $OS =~ Msys ]]; then
+                        # There is a Bug on Msys that may not be able to kill 
the identified process
+                        ppid=`jps -v | grep -i 
"org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v 
grep | awk -F ' ' {'print $1'}`
+                elif [[ $OS =~ Darwin ]]; then
+                        # Known problem: grep Java may not be able to 
accurately identify Java processes
+                        ppid=$(/bin/ps -o user,pid,command | grep "java" | 
grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev 
"^root" |awk -F ' ' {'print $2'})
+                else
+                  if [ $DOCKER ]; then
+                    # No need to exclude root user in Docker containers.
+                    ppid=$(ps -C java -o user,pid,command --cols 99999 | grep 
-w $EVENTMESH_ADMIN_HOME | grep -i 
"org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print 
$2'})
+                  else
         # It is required to identify the process as accurately as possible on 
Linux.
         ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w 
$EVENTMESH_ADMIN_HOME | grep -i 
"org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk 
-F ' ' {'print $2'})
       fi
-               fi
-       fi
-       echo "$ppid";
+                fi
+        fi
+        echo "$ppid";
 }
 
 
#===========================================================================================
@@ -136,8 +136,7 @@ export JAVA_HOME
 
 GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log"
 
-#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m 
-XX:SurvivorRatio=4"
-JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep 
APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}`
+JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"
 JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m 
-XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 
-XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50"
 JAVA_OPT="${JAVA_OPT} -verbose:gc"
 if [[ "$JAVA_VERSION" == "8" ]]; then
@@ -172,7 +171,7 @@ JAVA_OPT="${JAVA_OPT} 
-DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin"
 #            echo "proxy is running already"
 #            exit 9;
 #        else
-#          echo "err pid$pid, rm pid.file"
+#           echo "err pid$pid, rm pid.file"
 #            rm pid.file
 #        fi
 #fi
@@ -183,8 +182,8 @@ if [[ $pid == "ERROR"* ]]; then
   exit 9
 fi
 if [ -n "$pid" ]; then
-       echo -e "ERROR\t The server is already running (pid=$pid), there is no 
need to execute start.sh again."
-       exit 9
+        echo -e "ERROR\t The server is already running (pid=$pid), there is no 
need to execute start.sh again."
+        exit 9
 fi
 
 make_logs_dir
@@ -193,9 +192,9 @@ echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> 
${EVENTMESH_ADMIN_LOG_H
 
 EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer
 if [ $DOCKER ]; then
-       $JAVA $JAVA_OPT -classpath 
${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/*
 $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
+        $JAVA $JAVA_OPT -classpath 
${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/*
 $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
 else
-       $JAVA $JAVA_OPT -classpath 
${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/*
 $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
+        $JAVA $JAVA_OPT -classpath 
${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/*
 $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
 echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
 fi
 exit 0
diff --git a/eventmesh-admin-server/build.gradle 
b/eventmesh-admin-server/build.gradle
index 1fec2c7c5..95c8fa137 100644
--- a/eventmesh-admin-server/build.gradle
+++ b/eventmesh-admin-server/build.gradle
@@ -38,6 +38,8 @@ dependencies {
        implementation "com.alibaba:druid-spring-boot-starter"
     compileOnly 'com.mysql:mysql-connector-j'
        compileOnly 'org.projectlombok:lombok'
+    testImplementation 'junit:junit:4.12'
+    testImplementation 'org.projectlombok:lombok'
        annotationProcessor 'org.projectlombok:lombok'
 }
 
diff --git a/eventmesh-admin-server/conf/application.yaml 
b/eventmesh-admin-server/conf/application.yaml
index 274196db6..3d702e579 100644
--- a/eventmesh-admin-server/conf/application.yaml
+++ b/eventmesh-admin-server/conf/application.yaml
@@ -35,8 +35,8 @@ event-mesh:
     # grpc server port
     port: 8081
     adminServerList:
-      region1:
+      R1:
         - http://localhost:8082
-      region2:
-        - http://localhost:8083
-    region: region1
\ No newline at end of file
+      R2:
+        - http://localhost:8082
+    region: R1
\ No newline at end of file
diff --git a/eventmesh-admin-server/conf/eventmesh.sql 
b/eventmesh-admin-server/conf/eventmesh.sql
index 986320570..6e28daca8 100644
--- a/eventmesh-admin-server/conf/eventmesh.sql
+++ b/eventmesh-admin-server/conf/eventmesh.sql
@@ -102,7 +102,6 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` (
   `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP,
   `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
   PRIMARY KEY (`id`),
-  UNIQUE KEY `runtimeAddr` (`runtimeAddr`),
   KEY `jobID` (`jobID`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
 
@@ -137,6 +136,7 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
 CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
   `id` int unsigned NOT NULL AUTO_INCREMENT,
   `taskID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
+  `jobID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
   `recordID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
   `recordSig` varchar(50) COLLATE utf8_bin DEFAULT NULL,
   `connectorName` varchar(200) COLLATE utf8_bin DEFAULT NULL,
diff --git a/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml 
b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
index b7b042145..45727498c 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
@@ -26,6 +26,7 @@
     <resultMap id="BaseResultMap" 
type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify">
             <id property="id" column="id" jdbcType="INTEGER"/>
             <result property="taskID" column="taskID" jdbcType="VARCHAR"/>
+            <result property="jobID" column="jobID" jdbcType="VARCHAR"/>
             <result property="recordID" column="recordID" jdbcType="VARCHAR"/>
             <result property="recordSig" column="recordSig" 
jdbcType="VARCHAR"/>
             <result property="connectorName" column="connectorName" 
jdbcType="VARCHAR"/>
@@ -35,8 +36,8 @@
     </resultMap>
 
     <sql id="Base_Column_List">
-        id,taskID,recordID,
-        recordSig,connectorName,connectorStage,
+        id,taskID,jobID,recordID,
+        recordSig,connectorName,connectorStage,
         position,createTime
     </sql>
 </mapper>
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
index 12afb3a3d..8350802f7 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
@@ -17,8 +17,11 @@
 
 package org.apache.eventmesh.admin.server.web;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.eventmesh.admin.server.web.service.task.TaskBizService;
+import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService;
 import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
+import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
 import org.apache.eventmesh.common.remote.response.CreateTaskResponse;
 import org.apache.eventmesh.common.utils.JsonUtils;
 
@@ -31,17 +34,36 @@ import 
org.springframework.web.bind.annotation.RestController;
 
 @RestController
 @RequestMapping("/eventmesh/admin")
+@Slf4j
 public class HttpServer {
 
     @Autowired
     private TaskBizService taskService;
 
+    @Autowired
+    private VerifyBizService verifyService;
+
     @RequestMapping(value = "/createTask", method = RequestMethod.POST)
     public ResponseEntity<Object> createOrUpdateTask(@RequestBody 
CreateTaskRequest task) {
+        log.info("receive http proto create task:{}",task);
         CreateTaskResponse createTaskResponse = taskService.createTask(task);
+        log.info("receive http proto create task 
result:{}",createTaskResponse);
         return 
ResponseEntity.ok(JsonUtils.toJSONString(Response.success(createTaskResponse)));
     }
 
+
+    @RequestMapping(value = "/reportVerify", method = RequestMethod.POST)
+    public ResponseEntity<Object> reportVerify(@RequestBody 
ReportVerifyRequest request) {
+        log.info("receive http proto report verify request:{}", request);
+        boolean result = verifyService.reportVerifyRecord(request);
+        log.info("receive http proto report verify result:{}", result);
+        if (result) {
+            return ResponseEntity.ok("report verify success.request:" + 
JsonUtils.toJSONString(request));
+        } else {
+            return ResponseEntity.internalServerError().body("report verify 
success.request:" + JsonUtils.toJSONString(request));
+        }
+    }
+
     public boolean deleteTask(Long id) {
         return false;
     }
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java
index f1de76496..124eca426 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java
@@ -20,6 +20,7 @@ package org.apache.eventmesh.admin.server.web.db;
 import org.apache.eventmesh.common.EventMeshThreadFactory;
 
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -39,17 +40,34 @@ public class DBThreadPool {
             new LinkedBlockingQueue<>(1000), new 
EventMeshThreadFactory("admin-server-db"),
             new ThreadPoolExecutor.DiscardOldestPolicy());
 
+
+    private final ScheduledThreadPoolExecutor checkScheduledExecutor =
+            new 
ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new 
EventMeshThreadFactory("admin-server-check-scheduled"),
+                    new ThreadPoolExecutor.DiscardOldestPolicy());
+
     @PreDestroy
     private void destroy() {
         if (!executor.isShutdown()) {
             try {
                 executor.shutdown();
                 if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
-                    log.info("wait heart beat handler thread pool shutdown 
timeout, it will shutdown immediately");
+                    log.info("wait handler thread pool shutdown timeout, it 
will shutdown immediately");
                     executor.shutdownNow();
                 }
             } catch (InterruptedException e) {
-                log.warn("wait heart beat handler thread pool shutdown fail");
+                log.warn("wait handler thread pool shutdown fail");
+            }
+        }
+
+        if (!checkScheduledExecutor.isShutdown()) {
+            try {
+                checkScheduledExecutor.shutdown();
+                if (!checkScheduledExecutor.awaitTermination(30, 
TimeUnit.SECONDS)) {
+                    log.info("wait scheduled thread pool shutdown timeout, it 
will shutdown immediately");
+                    checkScheduledExecutor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                log.warn("wait scheduled thread pool shutdown fail");
             }
         }
     }
@@ -57,4 +75,8 @@ public class DBThreadPool {
     public ThreadPoolExecutor getExecutors() {
         return executor;
     }
+
+    public ScheduledThreadPoolExecutor getCheckExecutor() {
+        return checkScheduledExecutor;
+    }
 }
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java
index 5425c5c57..9d3e817ff 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java
@@ -37,6 +37,8 @@ public class EventMeshVerify implements Serializable {
 
     private String taskID;
 
+    private String jobID;
+
     private String recordID;
 
     private String recordSig;
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
index b377bcddd..3392084c2 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
@@ -56,7 +56,7 @@ public class FetchJobRequestHandler extends 
BaseRequestHandler<FetchJobRequest,
         
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource().getConf()));
         config.setSourceConnectorDesc(detail.getSourceConnectorDesc());
         
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource().getConf()));
-        config.setSourceConnectorDesc(detail.getSinkConnectorDesc());
+        config.setSinkConnectorDesc(detail.getSinkConnectorDesc());
         response.setConnectorConfig(config);
         response.setTransportType(detail.getTransportType());
         response.setState(detail.getState());
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java
similarity index 54%
copy from 
eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
copy to 
eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java
index 39963494c..defec3f8e 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java
@@ -17,33 +17,40 @@
 
 package org.apache.eventmesh.admin.server.web.handler.impl;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
 import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
-import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService;
+import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
 import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
 import org.apache.eventmesh.common.remote.exception.ErrorCode;
-import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
+import org.apache.eventmesh.common.remote.request.ReportJobRequest;
 import org.apache.eventmesh.common.remote.response.SimpleResponse;
-
-import org.apache.commons.lang3.StringUtils;
-
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import lombok.extern.slf4j.Slf4j;
-
 @Component
 @Slf4j
-public class ReportVerifyHandler extends 
BaseRequestHandler<ReportVerifyRequest, SimpleResponse> {
+public class ReportJobRequestHandler extends 
BaseRequestHandler<ReportJobRequest, SimpleResponse> {
+
     @Autowired
-    private VerifyBizService verifyService;
+    JobInfoBizService jobInfoBizService;
 
     @Override
-    protected SimpleResponse handler(ReportVerifyRequest request, Metadata 
metadata) {
-        if (StringUtils.isAnyBlank(request.getTaskID(), 
request.getRecordSig(), request.getRecordID(), request.getConnectorStage())) {
-            log.info("report verify request [{}] illegal", request);
-            return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task 
id, sign, record id or stage is none");
+    public SimpleResponse handler(ReportJobRequest request, Metadata metadata) 
{
+        log.info("receive report job request:{}", request);
+        if (StringUtils.isBlank(request.getJobID())) {
+            return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, 
it's empty");
+        }
+        EventMeshJobInfo jobInfo = 
jobInfoBizService.getJobInfo(request.getJobID());
+        if (jobInfo == null) {
+            return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, 
not exist target job,jobID:" + request.getJobID());
+        }
+        boolean result = jobInfoBizService.updateJobState(jobInfo.getJobID(), 
request.getState());
+        if (result) {
+            return SimpleResponse.success();
+        } else {
+            return SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "update job 
failed.");
         }
-        return verifyService.reportVerifyRecord(request) ? 
SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save 
verify "
-                + "request fail");
     }
 }
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
index 5e2a96826..78335d419 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
@@ -48,6 +48,7 @@ public class ReportPositionHandler extends 
BaseRequestHandler<ReportPositionRequ
 
     @Override
     protected SimpleResponse handler(ReportPositionRequest request, Metadata 
metadata) {
+        log.info("receive report position request:{}", request);
         if (StringUtils.isBlank(request.getJobID())) {
             log.info("request [{}] illegal job id", request);
             return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, 
it's empty");
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
index 39963494c..99defbe7c 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
@@ -17,19 +17,23 @@
 
 package org.apache.eventmesh.admin.server.web.handler.impl;
 
+import org.apache.eventmesh.admin.server.AdminServerProperties;
+import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
 import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
+import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
 import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService;
 import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
 import org.apache.eventmesh.common.remote.exception.ErrorCode;
 import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
 import org.apache.eventmesh.common.remote.response.SimpleResponse;
-
 import org.apache.commons.lang3.StringUtils;
-
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
-
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.client.RestTemplate;
+import java.util.List;
+import java.util.Random;
 
 @Component
 @Slf4j
@@ -37,13 +41,45 @@ public class ReportVerifyHandler extends 
BaseRequestHandler<ReportVerifyRequest,
     @Autowired
     private VerifyBizService verifyService;
 
+    @Autowired
+    JobInfoBizService jobInfoBizService;
+
+    @Autowired
+    private AdminServerProperties properties;
+
     @Override
     protected SimpleResponse handler(ReportVerifyRequest request, Metadata 
metadata) {
-        if (StringUtils.isAnyBlank(request.getTaskID(), 
request.getRecordSig(), request.getRecordID(), request.getConnectorStage())) {
+        if (StringUtils.isAnyBlank(request.getTaskID(), request.getJobID(), 
request.getRecordSig(), request.getRecordID(), request.getConnectorStage())) {
             log.info("report verify request [{}] illegal", request);
-            return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task 
id, sign, record id or stage is none");
+            return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task 
id,job id, sign, record id or stage is none");
+        }
+
+        String jobID = request.getJobID();
+        EventMeshJobInfo jobInfo = jobInfoBizService.getJobInfo(jobID);
+        if (jobInfo == null || StringUtils.isBlank(jobInfo.getFromRegion())) {
+            log.info("report verify job info [{}] illegal", request);
+            return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "job info is 
null or fromRegion is blank,job id:" + jobID);
+        }
+
+        String fromRegion = jobInfo.getFromRegion();
+        String localRegion = properties.getRegion();
+        log.info("report verify request from 
region:{},localRegion:{},request:{}", fromRegion, localRegion, request);
+        if(fromRegion.equalsIgnoreCase(localRegion)){
+            return verifyService.reportVerifyRecord(request) ? 
SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save 
verify "
+                    + "request fail");
+        } else {
+            log.info("start transfer report verify to from region admin 
server. from region:{}", fromRegion);
+            List<String> adminServerList = 
properties.getAdminServerList().get(fromRegion);
+            if (adminServerList == null || adminServerList.isEmpty()) {
+                throw new RuntimeException("No admin server available for 
region: " + fromRegion);
+            }
+            String targetUrl = adminServerList.get(new 
Random().nextInt(adminServerList.size())) + "/eventmesh/admin/reportVerify";
+            RestTemplate restTemplate = new RestTemplate();
+            ResponseEntity<String> response = 
restTemplate.postForEntity(targetUrl, request, String.class);
+            if (!response.getStatusCode().is2xxSuccessful()) {
+                return SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save 
verify request fail,code:" + response.getStatusCode() + ",msg:" + 
response.getBody());
+            }
+            return SimpleResponse.success();
         }
-        return verifyService.reportVerifyRecord(request) ? 
SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save 
verify "
-                + "request fail");
     }
 }
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
index 0657383e2..70abececb 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
@@ -19,15 +19,19 @@ package org.apache.eventmesh.admin.server.web.service.job;
 
 import org.apache.eventmesh.admin.server.AdminServerProperties;
 import org.apache.eventmesh.admin.server.AdminServerRuntimeException;
+import org.apache.eventmesh.admin.server.web.db.DBThreadPool;
 import org.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource;
 import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
+import 
org.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat;
 import 
org.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService;
 import 
org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoExtService;
 import 
org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService;
+import 
org.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService;
 import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
 import 
org.apache.eventmesh.admin.server.web.service.datasource.DataSourceBizService;
 import 
org.apache.eventmesh.admin.server.web.service.position.PositionBizService;
 import org.apache.eventmesh.common.config.connector.Config;
+import org.apache.eventmesh.common.remote.JobState;
 import org.apache.eventmesh.common.remote.TaskState;
 import org.apache.eventmesh.common.remote.TransportType;
 import org.apache.eventmesh.common.remote.datasource.DataSource;
@@ -35,20 +39,18 @@ import 
org.apache.eventmesh.common.remote.datasource.DataSourceType;
 import org.apache.eventmesh.common.remote.exception.ErrorCode;
 import org.apache.eventmesh.common.remote.request.CreateOrUpdateDataSourceReq;
 import org.apache.eventmesh.common.utils.JsonUtils;
-
 import org.apache.commons.lang3.StringUtils;
-
+import java.time.Duration;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
-
+import java.util.concurrent.TimeUnit;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
-
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
-
 import lombok.extern.slf4j.Slf4j;
+import javax.annotation.PostConstruct;
 
 /**
  * for table 'event_mesh_job_info' db operation
@@ -75,13 +77,41 @@ public class JobInfoBizService {
     @Autowired
     private AdminServerProperties properties;
 
+    @Autowired
+    EventMeshRuntimeHeartbeatService heartbeatService;
+
+    private final long heatBeatPeriod = Duration.ofMillis(5000).toMillis();
+
+    @Autowired
+    DBThreadPool executor;
+
+    @PostConstruct
+    public void init() {
+        log.info("init check job info scheduled task.");
+        executor.getCheckExecutor().scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                checkJobInfo();
+            }
+        }, 10, 10, TimeUnit.SECONDS);
+    }
+
     public boolean updateJobState(String jobID, TaskState state) {
         if (jobID == null || state == null) {
             return false;
         }
         EventMeshJobInfo jobInfo = new EventMeshJobInfo();
         jobInfo.setJobState(state.name());
-        return jobInfoService.update(jobInfo, 
Wrappers.<EventMeshJobInfo>update().eq("jobID", jobID).ne("state", 
TaskState.DELETE.name()));
+        return jobInfoService.update(jobInfo, 
Wrappers.<EventMeshJobInfo>update().eq("jobID", jobID).ne("jobState", 
JobState.DELETE.name()));
+    }
+
+    public boolean updateJobState(String jobID, JobState state) {
+        if (jobID == null || state == null) {
+            return false;
+        }
+        EventMeshJobInfo jobInfo = new EventMeshJobInfo();
+        jobInfo.setJobState(state.name());
+        return jobInfoService.update(jobInfo, 
Wrappers.<EventMeshJobInfo>update().eq("jobID", jobID).ne("jobState", 
JobState.DELETE.name()));
     }
 
     @Transactional
@@ -114,7 +144,8 @@ public class JobInfoBizService {
             source.setOperator(job.getCreateUid());
             source.setRegion(job.getSourceDataSource().getRegion());
             source.setDesc(job.getSourceConnectorDesc());
-            source.setConfig(job.getSourceDataSource().getConf());
+            Config sourceConfig = job.getSourceDataSource().getConf();
+            source.setConfig(sourceConfig);
             
source.setConfigClass(job.getSourceDataSource().getConfClazz().getName());
             EventMeshDataSource createdSource = 
dataSourceBizService.createDataSource(source);
             entity.setSourceData(createdSource.getId());
@@ -124,7 +155,8 @@ public class JobInfoBizService {
             sink.setOperator(job.getCreateUid());
             sink.setRegion(job.getSinkDataSource().getRegion());
             sink.setDesc(job.getSinkConnectorDesc());
-            sink.setConfig(job.getSinkDataSource().getConf());
+            Config sinkConfig = job.getSinkDataSource().getConf();
+            sink.setConfig(sinkConfig);
             
sink.setConfigClass(job.getSinkDataSource().getConfClazz().getName());
             EventMeshDataSource createdSink = 
dataSourceBizService.createDataSource(sink);
             entity.setTargetData(createdSink.getId());
@@ -134,7 +166,7 @@ public class JobInfoBizService {
         int changed = jobInfoExtService.batchSave(entityList);
         if (changed != jobs.size()) {
             throw new AdminServerRuntimeException(ErrorCode.INTERNAL_ERR, 
String.format("create [%d] jobs of not match expect [%d]",
-                changed, jobs.size()));
+                    changed, jobs.size()));
         }
         return entityList;
     }
@@ -168,7 +200,7 @@ public class JobInfoBizService {
             detail.setSourceConnectorDesc(source.getDescription());
             if (source.getDataType() != null) {
                 
detail.setPositions(positionBizService.getPositionByJobID(job.getJobID(),
-                    DataSourceType.getDataSourceType(source.getDataType())));
+                        
DataSourceType.getDataSourceType(source.getDataType())));
 
             }
         }
@@ -195,6 +227,35 @@ public class JobInfoBizService {
         
detail.setTransportType(TransportType.getTransportType(job.getTransportType()));
         return detail;
     }
+
+    public EventMeshJobInfo getJobInfo(String jobID) {
+        if (jobID == null) {
+            return null;
+        }
+        EventMeshJobInfo job = 
jobInfoService.getOne(Wrappers.<EventMeshJobInfo>query().eq("jobID", jobID));
+        return job;
+    }
+
+    public void checkJobInfo() {
+        List<EventMeshJobInfo> eventMeshJobInfoList = 
jobInfoService.list(Wrappers.<EventMeshJobInfo>query().eq("jobState", 
JobState.RUNNING.name()));
+        log.info("start check job info.to check job size:{}", 
eventMeshJobInfoList.size());
+        for (EventMeshJobInfo jobInfo : eventMeshJobInfoList) {
+            String jobID = jobInfo.getJobID();
+            if (StringUtils.isEmpty(jobID)) {
+                continue;
+            }
+            EventMeshRuntimeHeartbeat heartbeat = 
heartbeatService.getOne(Wrappers.<EventMeshRuntimeHeartbeat>query().eq("jobID", 
jobID));
+            if (heartbeat == null) {
+                continue;
+            }
+            // if last heart beat update time have delay three period.print 
job heart beat delay warn
+            long currentTimeStamp = System.currentTimeMillis();
+            if (currentTimeStamp - heartbeat.getUpdateTime().getTime() > 3 * 
heatBeatPeriod) {
+                log.warn("current job heart heart has 
delay.jobID:{},currentTimeStamp:{},last update time:{}", jobID, 
currentTimeStamp, heartbeat.getUpdateTime());
+            }
+        }
+    }
+
 }
 
 
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java
new file mode 100644
index 000000000..b0f89ec03
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.admin.server.web.service.position.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService;
+import org.apache.eventmesh.admin.server.web.service.position.PositionHandler;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.remote.datasource.DataSourceType;
+import org.apache.eventmesh.common.remote.offset.RecordPosition;
+import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
+import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import java.util.ArrayList;
+import java.util.List;
+
+@Component
+@Slf4j
+public class HttpPositionHandler extends PositionHandler {
+    @Autowired
+    EventMeshPositionReporterHistoryService historyService;
+
+    @Override
+    protected DataSourceType getSourceType() {
+        return DataSourceType.HTTP;
+    }
+
+    @Override
+    public boolean handler(ReportPositionRequest request, Metadata metadata) {
+        log.info("receive http position report request:{}", request);
+        // mock wemq postion report store
+        return true;
+    }
+
+    @Override
+    public List<RecordPosition> handler(FetchPositionRequest request, Metadata 
metadata) {
+        // mock http position fetch request
+        List<RecordPosition> recordPositionList = new ArrayList<>();
+        return recordPositionList;
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java
index 74f208b19..9d648e0a7 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java
@@ -35,6 +35,7 @@ public class VerifyBizService {
         verify.setRecordSig(request.getRecordSig());
         verify.setPosition(request.getPosition());
         verify.setTaskID(request.getTaskID());
+        verify.setJobID(request.getJobID());
         verify.setConnectorName(request.getConnectorName());
         verify.setConnectorStage(request.getConnectorStage());
         return verifyService.save(verify);
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java
index 939c9d8d6..3f3f609a1 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/ConfigService.java
@@ -131,7 +131,7 @@ public class ConfigService {
         } else {
             filePath = path.startsWith(FILE_PATH_PREFIX) ? 
path.substring(FILE_PATH_PREFIX.length()) : this.configPath + path;
         }
-
+        filePath = normalizeFilePath(filePath);
         if (filePath.contains(".jar")) {
             try (final InputStream inputStream = 
getClass().getResourceAsStream(Objects.requireNonNull(resourceUrl))) {
                 if (inputStream == null) {
@@ -152,6 +152,15 @@ public class ConfigService {
         return (T) object;
     }
 
+    private String normalizeFilePath(String filePath) {
+        if (System.getProperty("os.name").toLowerCase().contains("win")) {
+            if (filePath.startsWith("/")) {
+                filePath = filePath.substring(1);
+            }
+        }
+        return filePath;
+    }
+
     private void populateConfig(Object object, Class<?> clazz, Config config)
         throws NoSuchFieldException, IOException, IllegalAccessException {
         ConfigInfo configInfo = new ConfigInfo();
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpRetryConfig.java
similarity index 95%
rename from 
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java
rename to 
eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpRetryConfig.java
index 08c3a323e..319732a87 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpRetryConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.connector.http.sink.config;
+package org.apache.eventmesh.common.config.connector.http;
 
 import lombok.Data;
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpSinkConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpSinkConfig.java
similarity index 94%
rename from 
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpSinkConfig.java
rename to 
eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpSinkConfig.java
index 5997b90b7..3c429f335 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpSinkConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpSinkConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.connector.http.sink.config;
+package org.apache.eventmesh.common.config.connector.http;
 
 import org.apache.eventmesh.common.config.connector.SinkConfig;
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpWebhookConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpWebhookConfig.java
similarity index 95%
rename from 
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpWebhookConfig.java
rename to 
eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpWebhookConfig.java
index f15bac456..96b9e0982 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpWebhookConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/HttpWebhookConfig.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.connector.http.sink.config;
+package org.apache.eventmesh.common.config.connector.http;
 
 import lombok.Data;
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java
similarity index 84%
rename from 
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
rename to 
eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java
index 9bb338cce..ccebe5a99 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/SinkConnectorConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SinkConnectorConfig.java
@@ -15,9 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.connector.http.sink.config;
+package org.apache.eventmesh.common.config.connector.http;
 
-import io.vertx.core.http.HttpClientOptions;
 
 import lombok.Data;
 
@@ -29,19 +28,19 @@ public class SinkConnectorConfig {
     private String[] urls;
 
     // keepAlive, default true
-    private boolean keepAlive = HttpClientOptions.DEFAULT_KEEP_ALIVE;
+    private boolean keepAlive = true;
 
     // timeunit: ms, default 60000ms
-    private int keepAliveTimeout = 
HttpClientOptions.DEFAULT_KEEP_ALIVE_TIMEOUT * 1000; // Keep units consistent
+    private int keepAliveTimeout = 60 * 1000; // Keep units consistent
 
     // timeunit: ms, default 5000ms, recommended scope: 5000ms - 10000ms
     private int connectionTimeout = 5000;
 
     // timeunit: ms, default 5000ms
-    private int idleTimeout;
+    private int idleTimeout = 5000;
 
     // maximum number of HTTP/1 connections a client will pool, default 5
-    private int maxConnectionPoolSize = 
HttpClientOptions.DEFAULT_MAX_POOL_SIZE;
+    private int maxConnectionPoolSize = 5;
 
     // retry config
     private HttpRetryConfig retryConfig = new HttpRetryConfig();
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
index b7f075e6d..58d910bf2 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
@@ -27,7 +27,7 @@ public class SourceConnectorConfig {
 
     private String connectorName;
 
-    private String path;
+    private String path = "/";
 
     private int port;
 
@@ -51,11 +51,11 @@ public class SourceConnectorConfig {
     private int batchSize = 10;
 
     // protocol, default CloudEvent
-    private String protocol = "CloudEvent";
+    private String protocol = "Common";
 
     // extra config, e.g. GitHub secret
     private Map<String, String> extraConfig = new HashMap<>();
 
     // data consistency enabled, default true
-    private boolean dataConsistencyEnabled = true;
+    private boolean dataConsistencyEnabled = false;
 }
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java
new file mode 100644
index 000000000..53d20f2ac
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.remote;
+
+import lombok.ToString;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@ToString
+public enum JobState {
+    INIT, RUNNING, COMPLETE, DELETE, FAIL;
+    private static final JobState[] STATES_NUM_INDEX = JobState.values();
+    private static final Map<String, JobState> STATES_NAME_INDEX = new 
HashMap<>();
+    static {
+        for (JobState jobState : STATES_NUM_INDEX) {
+            STATES_NAME_INDEX.put(jobState.name(), jobState);
+        }
+    }
+
+    public static JobState fromIndex(Integer index) {
+        if (index == null || index < 0 || index >= STATES_NUM_INDEX.length) {
+            return null;
+        }
+
+        return STATES_NUM_INDEX[index];
+    }
+
+    public static JobState fromIndex(String index) {
+        if (index == null || index.isEmpty()) {
+            return null;
+        }
+
+        return STATES_NAME_INDEX.get(index);
+    }
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
index 82e7bc021..6b4359839 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
@@ -35,6 +35,7 @@ public enum TransportType {
     HTTP_REDIS(DataSourceType.HTTP, DataSourceType.REDIS),
     HTTP_ROCKETMQ(DataSourceType.HTTP, DataSourceType.ROCKETMQ),
     REDIS_MQ(DataSourceType.REDIS, DataSourceType.ROCKETMQ),
+    HTTP_HTTP(DataSourceType.HTTP, DataSourceType.HTTP),
     ;
     private static final Map<String, TransportType> INDEX_TYPES = new 
HashMap<>();
     private static final TransportType[] TYPES = TransportType.values();
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportJobRequest.java
similarity index 80%
copy from 
eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
copy to 
eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportJobRequest.java
index cd541949f..9e7444459 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportJobRequest.java
@@ -20,21 +20,17 @@ package org.apache.eventmesh.common.remote.request;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
+import org.apache.eventmesh.common.remote.JobState;
 
 @Data
 @EqualsAndHashCode(callSuper = true)
 @ToString
-public class ReportVerifyRequest extends BaseRemoteRequest {
+public class ReportJobRequest extends BaseRemoteRequest {
 
-    private String taskID;
+    private String jobID;
 
-    private String recordID;
+    private JobState state;
 
-    private String recordSig;
+    private String address;
 
-    private String connectorName;
-
-    private String connectorStage;
-
-    private String position;
 }
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
index cd541949f..bd38881c3 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
@@ -28,6 +28,8 @@ public class ReportVerifyRequest extends BaseRemoteRequest {
 
     private String taskID;
 
+    private String jobID;
+
     private String recordID;
 
     private String recordSig;
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
index 9e9cea304..f2328541c 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonUtils.java
@@ -58,6 +58,10 @@ public class JsonUtils {
         return OBJECT_MAPPER.convertValue(fromValue, toValueType);
     }
 
+    public static <T> T convertValue(Object fromValue, TypeReference<T> 
toValueTypeRef) {
+        return OBJECT_MAPPER.convertValue(fromValue, toValueTypeRef);
+    }
+
     public static <T> T mapToObject(Map<String, Object> map, Class<T> 
beanClass) {
         if (map == null) {
             return null;
@@ -177,6 +181,13 @@ public class JsonUtils {
         }
     }
 
+    public static <T> T parseTypeReferenceObject(Object object, 
TypeReference<T> typeReference) {
+        if (object == null) {
+            return null;
+        }
+        return convertValue(object, typeReference);
+    }
+
     public static <T> T parseTypeReferenceObject(byte[] text, TypeReference<T> 
typeReference) {
         try {
             return OBJECT_MAPPER.readValue(text, typeReference);
diff --git 
a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
 
b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
index 82d5c94dd..433cf57ed 100644
--- 
a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
+++ 
b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
@@ -16,6 +16,7 @@
 org.apache.eventmesh.common.remote.request.FetchJobRequest
 org.apache.eventmesh.common.remote.response.FetchJobResponse
 org.apache.eventmesh.common.remote.request.ReportPositionRequest
+org.apache.eventmesh.common.remote.request.ReportJobRequest
 org.apache.eventmesh.common.remote.request.ReportVerifyRequest
 org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest
 org.apache.eventmesh.common.remote.request.FetchPositionRequest
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
index 49fb10dd3..57c9e5645 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.connector.canal.sink.connector;
 
 import org.apache.eventmesh.common.config.connector.Config;
 import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig;
+import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.connector.canal.CanalConnectRecord;
 import org.apache.eventmesh.connector.canal.DatabaseConnection;
 import org.apache.eventmesh.connector.canal.SqlUtils;
@@ -75,6 +76,7 @@ import 
org.springframework.transaction.support.TransactionCallback;
 import org.springframework.util.CollectionUtils;
 
 import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
+import com.fasterxml.jackson.core.type.TypeReference;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -165,8 +167,11 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
         DbLoadContext context = new DbLoadContext();
         for (ConnectRecord connectRecord : sinkRecords) {
             List<CanalConnectRecord> canalConnectRecordList = new 
ArrayList<>();
+
+            List<CanalConnectRecord> canalConnectRecords = 
convertToCanalConnectRecord(connectRecord);
+
             // deep copy connectRecord data
-            for (CanalConnectRecord record : (List<CanalConnectRecord>) 
connectRecord.getData()) {
+            for (CanalConnectRecord record : canalConnectRecords) {
                 canalConnectRecordList.add(SerializationUtils.clone(record));
             }
             canalConnectRecordList = filterRecord(canalConnectRecordList);
@@ -302,7 +307,8 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
     private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig 
sinkConfig, ConnectRecord connectRecord) {
         int batchIndex = connectRecord.getExtension("batchIndex", 
Integer.class);
         int totalBatches = connectRecord.getExtension("totalBatches", 
Integer.class);
-        List<CanalConnectRecord> canalConnectRecordList = 
(List<CanalConnectRecord>) connectRecord.getData();
+        List<CanalConnectRecord> canalConnectRecordList = 
convertToCanalConnectRecord(connectRecord);
+
         String gtid = canalConnectRecordList.get(0).getCurrentGtid();
         GtidBatchManager.addBatch(gtid, batchIndex, totalBatches, 
canalConnectRecordList);
         // check whether the batch is complete
@@ -357,6 +363,20 @@ public class CanalSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
         }
     }
 
+    private List<CanalConnectRecord> convertToCanalConnectRecord(ConnectRecord 
connectRecord) {
+        List<CanalConnectRecord> canalConnectRecordList;
+        try {
+            canalConnectRecordList =
+                JsonUtils.parseTypeReferenceObject((byte[]) 
connectRecord.getData(), new TypeReference<List<CanalConnectRecord>>() {
+                });
+        } catch (Exception e) {
+            log.error("Failed to parse the canalConnectRecords.", e);
+            
connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord,
 e));
+            throw new RuntimeException("Failed to parse the 
canalConnectRecords.", e);
+        }
+        return canalConnectRecordList;
+    }
+
     private List<List<CanalConnectRecord>> split(List<CanalConnectRecord> 
records) {
         List<List<CanalConnectRecord>> result = new ArrayList<>();
         if (records == null || records.isEmpty()) {
diff --git 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
index ea5ccdeed..49bc05878 100644
--- 
a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
@@ -36,6 +36,7 @@ import 
org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 import org.apache.commons.lang3.StringUtils;
 
 import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -312,7 +313,7 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
             entries = new ArrayList<>(message.getRawEntries().size());
             for (ByteString entry : message.getRawEntries()) {
                 try {
-                    entries.add(CanalEntry.Entry.parseFrom(entry));
+                    entries.add(Entry.parseFrom(entry));
                 } catch (InvalidProtocolBufferException e) {
                     throw new RuntimeException(e);
                 }
@@ -356,7 +357,7 @@ public class CanalSourceConnector implements Source, 
ConnectorCreateService<Sour
                     connectRecord.addExtension("messageId", 
String.valueOf(message.getId()));
                     connectRecord.addExtension("batchIndex", i);
                     connectRecord.addExtension("totalBatches", 
splitLists.size());
-                    connectRecord.setData(splitLists.get(i));
+                    
connectRecord.setData(JsonUtils.toJSONString(splitLists.get(i)).getBytes(StandardCharsets.UTF_8));
                     result.add(connectRecord);
                 }
             }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
index 9b6038bde..3df110f2e 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
@@ -18,8 +18,8 @@
 package org.apache.eventmesh.connector.http.sink;
 
 import org.apache.eventmesh.common.config.connector.Config;
-import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig;
-import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.common.config.connector.http.HttpSinkConfig;
+import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
 import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler;
 import 
org.apache.eventmesh.connector.http.sink.handler.impl.CommonHttpSinkHandler;
 import 
org.apache.eventmesh.connector.http.sink.handler.impl.HttpSinkHandlerRetryWrapper;
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
index 95b40afe9..9c8b1ce67 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
@@ -19,9 +19,11 @@ package org.apache.eventmesh.connector.http.sink.data;
 
 import org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.KeyValue;
 
 import java.io.Serializable;
 import java.time.LocalDateTime;
+import java.util.Base64;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -58,10 +60,9 @@ public class HttpConnectRecord implements Serializable {
      */
     private String eventId;
 
-    /**
-     * The ConnectRecord to be sent
-     */
-    private ConnectRecord data;
+    private Object data;
+
+    private KeyValue extensions;
 
     @Override
     public String toString() {
@@ -71,6 +72,7 @@ public class HttpConnectRecord implements Serializable {
             + ", type='" + type
             + ", eventId='" + eventId
             + ", data=" + data
+            + ", extensions=" + extensions
             + '}';
     }
 
@@ -83,16 +85,34 @@ public class HttpConnectRecord implements Serializable {
     public static HttpConnectRecord convertConnectRecord(ConnectRecord record, 
String type) {
         Map<String, ?> offsetMap = new HashMap<>();
         if (record != null && record.getPosition() != null && 
record.getPosition().getRecordOffset() != null) {
-            offsetMap = ((HttpRecordOffset) 
record.getPosition().getRecordOffset()).getOffsetMap();
+            if 
(HttpRecordOffset.class.equals(record.getPosition().getRecordOffsetClazz())) {
+                offsetMap = ((HttpRecordOffset) 
record.getPosition().getRecordOffset()).getOffsetMap();
+            }
         }
         String offset = "0";
         if (!offsetMap.isEmpty()) {
             offset = offsetMap.values().iterator().next().toString();
         }
-        return HttpConnectRecord.builder()
-            .type(type)
-            .eventId(type + "-" + offset)
-            .data(record)
-            .build();
+        if (record.getData() instanceof byte[]) {
+            String data = Base64.getEncoder().encodeToString((byte[]) 
record.getData());
+            record.addExtension("isBase64", true);
+            return HttpConnectRecord.builder()
+                .type(type)
+                .createTime(LocalDateTime.now())
+                .eventId(type + "-" + offset)
+                .data(data)
+                .extensions(record.getExtensions())
+                .build();
+        } else {
+            record.addExtension("isBase64", false);
+            return HttpConnectRecord.builder()
+                .type(type)
+                .createTime(LocalDateTime.now())
+                .eventId(type + "-" + offset)
+                .data(record.getData())
+                .extensions(record.getExtensions())
+                .build();
+        }
     }
+
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
index 36d01115b..5c868f4aa 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
@@ -17,7 +17,7 @@
 
 package org.apache.eventmesh.connector.http.sink.handler;
 
-import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
 import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
 import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
 import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
@@ -81,7 +81,7 @@ public abstract class AbstractHttpSinkHandler implements 
HttpSinkHandler {
             attributes.put(HttpRetryEvent.PREFIX + 
httpConnectRecord.getHttpRecordId(), retryEvent);
 
             // deliver the record
-            deliver(url, httpConnectRecord, attributes);
+            deliver(url, httpConnectRecord, attributes, record);
         }
     }
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java
index 1731809ab..697eb407d 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java
@@ -33,14 +33,14 @@ import io.vertx.ext.web.client.HttpResponse;
  *
  * <p>Any class that needs to process ConnectRecords via HTTP or HTTPS should 
implement this interface.
  * Implementing classes must provide implementations for the {@link #start()}, 
{@link #handle(ConnectRecord)},
- * {@link #deliver(URI, HttpConnectRecord, Map)}, and {@link #stop()} 
methods.</p>
+ * {@link #deliver(URI, HttpConnectRecord, Map, ConnectRecord)}, and {@link 
#stop()} methods.</p>
  *
  * <p>Implementing classes should ensure thread safety and handle HTTP/HTTPS 
communication efficiently.
  * The {@link #start()} method initializes any necessary resources for 
HTTP/HTTPS communication. The {@link #handle(ConnectRecord)} method processes a
- * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, 
HttpConnectRecord, Map)} method processes HttpConnectRecord on specified
+ * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, 
HttpConnectRecord, Map, ConnectRecord)} method processes HttpConnectRecord on 
specified
  * URL while returning its own processing logic {@link #stop()} method 
releases any resources used for HTTP/HTTPS communication.</p>
  *
- * <p>It's recommended to handle exceptions gracefully within the {@link 
#deliver(URI, HttpConnectRecord, Map)} method
+ * <p>It's recommended to handle exceptions gracefully within the {@link 
#deliver(URI, HttpConnectRecord, Map, ConnectRecord)} method
  * to prevent message loss or processing interruptions.</p>
  */
 public interface HttpSinkHandler {
@@ -66,7 +66,7 @@ public interface HttpSinkHandler {
      * @param attributes        additional attributes to be used in processing
      * @return processing chain
      */
-    Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord 
httpConnectRecord, Map<String, Object> attributes);
+    Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord 
httpConnectRecord, Map<String, Object> attributes, ConnectRecord connectRecord);
 
     /**
      * Cleans up and releases resources used by the HTTP/HTTPS handler. This 
method should be called when the handler is no longer needed.
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
index 090784745..d498d449c 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
@@ -17,20 +17,29 @@
 
 package org.apache.eventmesh.connector.http.sink.handler.impl;
 
+import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
 import org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset;
-import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+
+import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
 import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
 import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
 import 
org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
 import org.apache.eventmesh.connector.http.util.HttpUtils;
 import 
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import 
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendMessageCallback;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
 import java.net.URI;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import io.netty.handler.codec.http.HttpHeaderNames;
@@ -104,22 +113,24 @@ public class CommonHttpSinkHandler extends 
AbstractHttpSinkHandler {
      * @return processing chain
      */
     @Override
-    public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord 
httpConnectRecord, Map<String, Object> attributes) {
+    public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord 
httpConnectRecord, Map<String, Object> attributes, ConnectRecord connectRecord) 
{
         // create headers
+        Map<String, Object> extensionMap = new HashMap<>();
+        Set<String> extensionKeySet = 
httpConnectRecord.getExtensions().keySet();
+        for (String extensionKey : extensionKeySet) {
+            Object v = 
httpConnectRecord.getExtensions().getObject(extensionKey);
+            extensionMap.put(extensionKey, v);
+        }
+
         MultiMap headers = HttpHeaders.headers()
             .set(HttpHeaderNames.CONTENT_TYPE, "application/json; 
charset=utf-8")
-            .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");
-
+            .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8")
+            .set("extension", JsonUtils.toJSONString(extensionMap));
         // get timestamp and offset
-        Long timestamp = httpConnectRecord.getData().getTimestamp();
-        Map<String, ?> offset = null;
-        try {
-            // May throw NullPointerException.
-            offset = ((HttpRecordOffset) 
httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
-        } catch (NullPointerException e) {
-            // ignore null pointer exception
-        }
-        final Map<String, ?> finalOffset = offset;
+        Long timestamp = httpConnectRecord.getCreateTime()
+                .atZone(ZoneId.systemDefault())
+                .toInstant()
+                .toEpochMilli();
 
         // send the request
         return this.webClient.post(url.getPath())
@@ -127,40 +138,38 @@ public class CommonHttpSinkHandler extends 
AbstractHttpSinkHandler {
             .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), 
"https") ? 443 : 80) : url.getPort())
             .putHeaders(headers)
             .ssl(Objects.equals(url.getScheme(), "https"))
-            .sendJson(httpConnectRecord)
+            .sendJson(httpConnectRecord.getData())
             .onSuccess(res -> {
-                log.info("Request sent successfully. Record: timestamp={}, 
offset={}", timestamp, finalOffset);
+                log.info("Request sent successfully. Record: timestamp={}", 
timestamp);
 
                 Exception e = null;
 
                 // log the response
                 if (HttpUtils.is2xxSuccessful(res.statusCode())) {
                     if (log.isDebugEnabled()) {
-                        log.debug("Received successful response: 
statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
-                            res.statusCode(), timestamp, finalOffset, 
res.bodyAsString());
+                        log.debug("Received successful response: 
statusCode={}. Record: timestamp={}, responseBody={}",
+                            res.statusCode(), timestamp, res.bodyAsString());
                     } else {
-                        log.info("Received successful response: statusCode={}. 
Record: timestamp={}, offset={}", res.statusCode(), timestamp,
-                            finalOffset);
+                        log.info("Received successful response: statusCode={}. 
Record: timestamp={}", res.statusCode(), timestamp);
                     }
                 } else {
                     if (log.isDebugEnabled()) {
-                        log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}, offset={}, responseBody={}",
-                            res.statusCode(), timestamp, finalOffset, 
res.bodyAsString());
+                        log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}, responseBody={}",
+                            res.statusCode(), timestamp, res.bodyAsString());
                     } else {
-                        log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}, offset={}", res.statusCode(), timestamp,
-                            finalOffset);
+                        log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}", res.statusCode(), timestamp);
                     }
 
                     e = new RuntimeException("Unexpected HTTP response code: " 
+ res.statusCode());
                 }
 
                 // try callback
-                tryCallback(httpConnectRecord, e, attributes);
+                tryCallback(httpConnectRecord, e, attributes, connectRecord);
             }).onFailure(err -> {
-                log.error("Request failed to send. Record: timestamp={}, 
offset={}", timestamp, finalOffset, err);
+                log.error("Request failed to send. Record: timestamp={}", 
timestamp, err);
 
                 // try callback
-                tryCallback(httpConnectRecord, err, attributes);
+                tryCallback(httpConnectRecord, err, attributes, connectRecord);
             });
     }
 
@@ -171,7 +180,7 @@ public class CommonHttpSinkHandler extends 
AbstractHttpSinkHandler {
      * @param e                 the exception thrown during the request, may 
be null
      * @param attributes        additional attributes to be used in processing
      */
-    private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, 
Map<String, Object> attributes) {
+    private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, 
Map<String, Object> attributes, ConnectRecord record) {
         // get the retry event
         HttpRetryEvent retryEvent = getAndUpdateRetryEvent(attributes, 
httpConnectRecord, e);
 
@@ -180,7 +189,6 @@ public class CommonHttpSinkHandler extends 
AbstractHttpSinkHandler {
 
         if (multiHttpRequestContext.getRemainingRequests() == 0) {
             // do callback
-            ConnectRecord record = httpConnectRecord.getData();
             if (record.getCallback() == null) {
                 if (log.isDebugEnabled()) {
                     log.warn("ConnectRecord callback is null. Ignoring 
callback. {}", record);
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
index 268d0a0d6..9d7970a84 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
@@ -17,8 +17,8 @@
 
 package org.apache.eventmesh.connector.http.sink.handler.impl;
 
-import org.apache.eventmesh.connector.http.sink.config.HttpRetryConfig;
-import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.common.config.connector.http.HttpRetryConfig;
+import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
 import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
 import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
 import 
org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
index ff8f69d45..7031438da 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
@@ -19,8 +19,8 @@ package org.apache.eventmesh.connector.http.sink.handler.impl;
 
 import org.apache.eventmesh.common.exception.EventMeshException;
 import 
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
-import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
-import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.common.config.connector.http.HttpWebhookConfig;
+import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
 import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
 import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata;
 import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord;
@@ -54,6 +54,7 @@ import com.alibaba.fastjson2.JSONWriter;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
 /**
  * Extends CommonHttpSinkHandler to provide additional functionality for 
handling webhook features, including sending requests to callback servers,
@@ -209,9 +210,9 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
      * @return processing chain
      */
     @Override
-    public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord 
httpConnectRecord, Map<String, Object> attributes) {
+    public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord 
httpConnectRecord, Map<String, Object> attributes, ConnectRecord connectRecord) 
{
         // send the request
-        Future<HttpResponse<Buffer>> responseFuture = super.deliver(url, 
httpConnectRecord, attributes);
+        Future<HttpResponse<Buffer>> responseFuture = super.deliver(url, 
httpConnectRecord, attributes, connectRecord);
         // store the received data
         return responseFuture.onComplete(arr -> {
             // get tryEvent from attributes
@@ -260,8 +261,7 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
             .code(response != null ? response.statusCode() : -1)
             .message(msg)
             .receivedTime(LocalDateTime.now())
-            .httpRecordId(httpConnectRecord.getHttpRecordId())
-            .recordId(httpConnectRecord.getData().getRecordId())
+            .recordId(httpConnectRecord.getHttpRecordId())
             .retryNum(retryEvent.getCurrentRetries())
             .build();
     }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java
index 2fe7399da..b6432672a 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java
@@ -20,6 +20,7 @@ package org.apache.eventmesh.connector.http.source.data;
 import java.io.Serializable;
 import java.util.Map;
 
+import io.vertx.ext.web.RoutingContext;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -42,4 +43,6 @@ public class WebhookRequest implements Serializable {
 
     private Object payload;
 
+    private RoutingContext routingContext;
+
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
index 738f04523..e3ceda55e 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
@@ -19,21 +19,23 @@ package 
org.apache.eventmesh.connector.http.source.protocol.impl;
 
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
+import org.apache.eventmesh.common.utils.JsonUtils;
 import 
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
 import org.apache.eventmesh.connector.http.source.data.CommonResponse;
 import org.apache.eventmesh.connector.http.source.data.WebhookRequest;
 import org.apache.eventmesh.connector.http.source.protocol.Protocol;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
+import java.util.Base64;
 import java.util.Map;
 import java.util.stream.Collectors;
 
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.vertx.core.http.HttpMethod;
+import io.vertx.core.json.JsonObject;
 import io.vertx.ext.web.Route;
 import io.vertx.ext.web.handler.BodyHandler;
 
-
 import lombok.extern.slf4j.Slf4j;
 
 /**
@@ -69,12 +71,13 @@ public class CommonProtocol implements Protocol {
             .handler(BodyHandler.create())
             .handler(ctx -> {
                 // Get the payload
-                String payloadStr = 
ctx.body().asString(Constants.DEFAULT_CHARSET.toString());
+                Object payload = 
ctx.body().asString(Constants.DEFAULT_CHARSET.toString());
+                payload = JsonUtils.parseObject(payload.toString(), 
String.class);
 
                 // Create and store the webhook request
                 Map<String, String> headerMap = 
ctx.request().headers().entries().stream()
                     .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
-                WebhookRequest webhookRequest = new 
WebhookRequest(PROTOCOL_NAME, ctx.request().absoluteURI(), headerMap, 
payloadStr);
+                WebhookRequest webhookRequest = new 
WebhookRequest(PROTOCOL_NAME, ctx.request().absoluteURI(), headerMap, payload, 
ctx);
                 if (!queue.offer(webhookRequest)) {
                     throw new IllegalStateException("Failed to store the 
request.");
                 }
@@ -110,7 +113,27 @@ public class CommonProtocol implements Protocol {
         ConnectRecord connectRecord = new ConnectRecord(null, null, 
System.currentTimeMillis(), request.getPayload());
         connectRecord.addExtension("source", request.getProtocolName());
         connectRecord.addExtension("url", request.getUrl());
-        connectRecord.addExtension("headers", request.getHeaders());
+        request.getHeaders().forEach((k, v) -> {
+            if (k.equalsIgnoreCase("extension")) {
+                JsonObject extension = new JsonObject(v);
+                extension.forEach(e -> connectRecord.addExtension(e.getKey(), 
e.getValue()));
+            }
+        });
+        // check recordUniqueId
+        if (!connectRecord.getExtensions().containsKey("recordUniqueId")) {
+            connectRecord.addExtension("recordUniqueId", 
connectRecord.getRecordId());
+        }
+
+        // check data
+        if (connectRecord.getExtension("isBase64") != null) {
+            if (Boolean.parseBoolean(connectRecord.getExtension("isBase64"))) {
+                byte[] data = 
Base64.getDecoder().decode(connectRecord.getData().toString());
+                connectRecord.setData(data);
+            }
+        }
+        if (request.getRoutingContext() != null) {
+            connectRecord.addExtension("routingContext", 
request.getRoutingContext());
+        }
         return connectRecord;
     }
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
index e86efcbf3..fac8c0d80 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
@@ -132,7 +132,7 @@ public class GitHubProtocol implements Protocol {
                 // Create and store the webhook request
                 Map<String, String> headerMap = headers.entries().stream()
                     .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
-                WebhookRequest webhookRequest = new 
WebhookRequest(PROTOCOL_NAME, ctx.request().absoluteURI(), headerMap, 
payloadStr);
+                WebhookRequest webhookRequest = new 
WebhookRequest(PROTOCOL_NAME, ctx.request().absoluteURI(), headerMap, 
payloadStr, ctx);
 
                 if (!queue.offer(webhookRequest)) {
                     throw new IllegalStateException("Failed to store the 
request.");
diff --git 
a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService
similarity index 63%
copy from 
eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
copy to 
eventmesh-connectors/eventmesh-connector-http/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService
index 82d5c94dd..d62ff1199 100644
--- 
a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.openconnect.api.ConnectorCreateService
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -12,11 +13,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+#
+
 
-org.apache.eventmesh.common.remote.request.FetchJobRequest
-org.apache.eventmesh.common.remote.response.FetchJobResponse
-org.apache.eventmesh.common.remote.request.ReportPositionRequest
-org.apache.eventmesh.common.remote.request.ReportVerifyRequest
-org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest
-org.apache.eventmesh.common.remote.request.FetchPositionRequest
-org.apache.eventmesh.common.remote.response.FetchPositionResponse
\ No newline at end of file
+HTTP-Source=org.apache.eventmesh.connector.http.source.HttpSourceConnector
+HTTP-Sink=org.apache.eventmesh.connector.http.sink.HttpSinkConnector
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
index 7ddba511c..5f65f0749 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
@@ -20,8 +20,8 @@ package org.apache.eventmesh.connector.http.sink;
 
 import static org.mockserver.model.HttpRequest.request;
 
-import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig;
-import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
+import org.apache.eventmesh.common.config.connector.http.HttpSinkConfig;
+import org.apache.eventmesh.common.config.connector.http.HttpWebhookConfig;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 import org.apache.eventmesh.openconnect.util.ConfigUtil;
 
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SinkConnectorContext.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SinkConnectorContext.java
index cf1b85347..d84dcfbb9 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SinkConnectorContext.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/SinkConnectorContext.java
@@ -21,6 +21,8 @@ import 
org.apache.eventmesh.common.config.connector.SinkConfig;
 
 import lombok.Data;
 
+import java.util.Map;
+
 /**
  * Sink Connector Context
  */
@@ -29,4 +31,6 @@ public class SinkConnectorContext implements ConnectorContext 
{
 
     public SinkConfig sinkConfig;
 
+    public Map<String, Object> runtimeConfig;
+
 }
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
index 977661b13..993352a97 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
@@ -112,6 +112,8 @@ public class AdminOffsetService implements 
OffsetManagementService {
 
         reportPositionRequest.setRecordPositionList(recordToSyncList);
 
+        log.debug("start report position request: {}", 
JsonUtils.toJSONString(reportPositionRequest));
+
         Metadata metadata = Metadata.newBuilder()
             .setType(ReportPositionRequest.class.getSimpleName())
             .build();
@@ -121,6 +123,7 @@ public class AdminOffsetService implements 
OffsetManagementService {
                 .build())
             .build();
         requestObserver.onNext(payload);
+        log.debug("end report position request: {}", 
JsonUtils.toJSONString(reportPositionRequest));
 
         for (Map.Entry<RecordPartition, RecordOffset> entry : 
recordMap.entrySet()) {
             positionStore.remove(entry.getKey());
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
index a0390c189..891df482b 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-api/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/api/data/DefaultKeyValue.java
@@ -23,6 +23,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
 public class DefaultKeyValue implements KeyValue {
 
     private final Map<String, Object> properties;
diff --git a/eventmesh-runtime-v2/build.gradle 
b/eventmesh-runtime-v2/build.gradle
index ecba7bffb..04b460ade 100644
--- a/eventmesh-runtime-v2/build.gradle
+++ b/eventmesh-runtime-v2/build.gradle
@@ -35,6 +35,7 @@ dependencies {
     implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
     implementation project(":eventmesh-common")
     implementation project(":eventmesh-connectors:eventmesh-connector-canal")
+    implementation project(":eventmesh-connectors:eventmesh-connector-http")
     implementation project(":eventmesh-meta:eventmesh-meta-api")
     implementation project(":eventmesh-meta:eventmesh-meta-nacos")
     implementation project(":eventmesh-registry:eventmesh-registry-api")
diff --git 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
index 501f222fd..0f7d4e4ed 100644
--- 
a/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
+++ 
b/eventmesh-runtime-v2/src/main/java/org/apache/eventmesh/runtime/connector/ConnectorRuntime.java
@@ -31,8 +31,10 @@ import 
org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.Ad
 import 
org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc.AdminServiceStub;
 import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
 import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
+import org.apache.eventmesh.common.remote.JobState;
 import org.apache.eventmesh.common.remote.request.FetchJobRequest;
 import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
+import org.apache.eventmesh.common.remote.request.ReportJobRequest;
 import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
 import org.apache.eventmesh.common.remote.response.FetchJobResponse;
 import org.apache.eventmesh.common.utils.IPUtils;
@@ -129,10 +131,14 @@ public class ConnectorRuntime implements Runtime {
 
     private final ScheduledExecutorService heartBeatExecutor = 
Executors.newSingleThreadScheduledExecutor();
 
+    private final ExecutorService reportVerifyExecutor = 
Executors.newSingleThreadExecutor();
+
     private final BlockingQueue<ConnectRecord> queue;
 
     private volatile boolean isRunning = false;
 
+    private volatile boolean isFailed = false;
+
     public static final String CALLBACK_EXTENSION = "callBackExtension";
 
     private String adminServerAddr;
@@ -207,6 +213,8 @@ public class ConnectorRuntime implements Runtime {
         FetchJobResponse jobResponse = fetchJobConfig();
 
         if (jobResponse == null) {
+            isFailed = true;
+            stop();
             throw new RuntimeException("fetch job config fail");
         }
 
@@ -258,8 +266,11 @@ public class ConnectorRuntime implements Runtime {
         SinkConfig sinkConfig = (SinkConfig) 
ConfigUtil.parse(connectorRuntimeConfig.getSinkConnectorConfig(), 
sinkConnector.configClass());
         SinkConnectorContext sinkConnectorContext = new SinkConnectorContext();
         sinkConnectorContext.setSinkConfig(sinkConfig);
+        
sinkConnectorContext.setRuntimeConfig(connectorRuntimeConfig.getRuntimeConfig());
         sinkConnector.init(sinkConnectorContext);
 
+        reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.INIT);
+
     }
 
     private FetchJobResponse fetchJobConfig() {
@@ -306,6 +317,7 @@ public class ConnectorRuntime implements Runtime {
             try {
                 startSinkConnector();
             } catch (Exception e) {
+                isFailed = true;
                 log.error("sink connector [{}] start fail", 
sinkConnector.name(), e);
                 try {
                     this.stop();
@@ -320,6 +332,7 @@ public class ConnectorRuntime implements Runtime {
             try {
                 startSourceConnector();
             } catch (Exception e) {
+                isFailed = true;
                 log.error("source connector [{}] start fail", 
sourceConnector.name(), e);
                 try {
                     this.stop();
@@ -329,15 +342,25 @@ public class ConnectorRuntime implements Runtime {
                 throw new RuntimeException(e);
             }
         });
+
+        reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.RUNNING);
     }
 
     @Override
     public void stop() throws Exception {
+        log.info("ConnectorRuntime start stop");
+        isRunning = false;
+        if (isFailed) {
+            reportJobRequest(connectorRuntimeConfig.getJobID(), JobState.FAIL);
+        } else {
+            reportJobRequest(connectorRuntimeConfig.getJobID(), 
JobState.COMPLETE);
+        }
         sourceConnector.stop();
         sinkConnector.stop();
         sourceService.shutdown();
         sinkService.shutdown();
         heartBeatExecutor.shutdown();
+        reportVerifyExecutor.shutdown();
         requestObserver.onCompleted();
         if (channel != null && !channel.isShutdown()) {
             channel.shutdown();
@@ -351,6 +374,10 @@ public class ConnectorRuntime implements Runtime {
             // TODO: use producer pub record to storage replace below
             if (connectorRecordList != null && !connectorRecordList.isEmpty()) 
{
                 for (ConnectRecord record : connectorRecordList) {
+                    // check recordUniqueId
+                    if (record.getExtensions() == null || 
!record.getExtensions().containsKey("recordUniqueId")) {
+                        record.addExtension("recordUniqueId", 
record.getRecordId());
+                    }
 
                     queue.put(record);
 
@@ -364,10 +391,18 @@ public class ConnectorRuntime implements Runtime {
                     record.setCallback(new SendMessageCallback() {
                         @Override
                         public void onSuccess(SendResult result) {
+                            log.debug("send record to sink callback success, 
record: {}", record);
                             // commit record
                             sourceConnector.commit(record);
-                            Optional<RecordOffsetManagement.SubmittedPosition> 
submittedRecordPosition = prepareToUpdateRecordOffset(record);
-                            
submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack);
+                            if (record.getPosition() != null) {
+                                
Optional<RecordOffsetManagement.SubmittedPosition> submittedRecordPosition = 
prepareToUpdateRecordOffset(record);
+                                
submittedRecordPosition.ifPresent(RecordOffsetManagement.SubmittedPosition::ack);
+                                log.debug("start wait all messages to commit");
+                                offsetManagement.awaitAllMessages(5000, 
TimeUnit.MILLISECONDS);
+                                // update & commit offset
+                                updateCommittableOffsets();
+                                commitOffsets();
+                            }
                             Optional<SendMessageCallback> callback =
                                 
Optional.ofNullable(record.getExtensionObj(CALLBACK_EXTENSION)).map(v -> 
(SendMessageCallback) v);
                             callback.ifPresent(cb -> 
cb.onSuccess(convertToSendResult(record)));
@@ -375,6 +410,7 @@ public class ConnectorRuntime implements Runtime {
 
                         @Override
                         public void onException(SendExceptionContext 
sendExceptionContext) {
+                            isFailed = true;
                             // handle exception
                             sourceConnector.onException(record);
                             log.error("send record to sink callback exception, 
process will shut down, record: {}", record,
@@ -386,11 +422,6 @@ public class ConnectorRuntime implements Runtime {
                             }
                         }
                     });
-
-                    offsetManagement.awaitAllMessages(5000, 
TimeUnit.MILLISECONDS);
-                    // update & commit offset
-                    updateCommittableOffsets();
-                    commitOffsets();
                 }
             }
         }
@@ -406,24 +437,47 @@ public class ConnectorRuntime implements Runtime {
     }
 
     private void reportVerifyRequest(ConnectRecord record, 
ConnectorRuntimeConfig connectorRuntimeConfig, ConnectorStage connectorStage) {
-        String md5Str = md5(record.toString());
-        ReportVerifyRequest reportVerifyRequest = new ReportVerifyRequest();
-        reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID());
-        reportVerifyRequest.setRecordID(record.getRecordId());
-        reportVerifyRequest.setRecordSig(md5Str);
-        reportVerifyRequest.setConnectorName(
-            IPUtils.getLocalAddress() + "_" + 
connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion());
-        reportVerifyRequest.setConnectorStage(connectorStage.name());
-        
reportVerifyRequest.setPosition(JsonUtils.toJSONString(record.getPosition()));
-
-        Metadata metadata = 
Metadata.newBuilder().setType(ReportVerifyRequest.class.getSimpleName()).build();
-
-        Payload request = Payload.newBuilder().setMetadata(metadata)
-            
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportVerifyRequest))))
-                .build())
-            .build();
+        reportVerifyExecutor.submit(() -> {
+            try {
+                // use record data + recordUniqueId for md5
+                String md5Str = md5(record.getData().toString() + 
record.getExtension("recordUniqueId"));
+                ReportVerifyRequest reportVerifyRequest = new 
ReportVerifyRequest();
+                
reportVerifyRequest.setTaskID(connectorRuntimeConfig.getTaskID());
+                
reportVerifyRequest.setJobID(connectorRuntimeConfig.getJobID());
+                reportVerifyRequest.setRecordID(record.getRecordId());
+                reportVerifyRequest.setRecordSig(md5Str);
+                reportVerifyRequest.setConnectorName(
+                    IPUtils.getLocalAddress() + "_" + 
connectorRuntimeConfig.getJobID() + "_" + connectorRuntimeConfig.getRegion());
+                reportVerifyRequest.setConnectorStage(connectorStage.name());
+                
reportVerifyRequest.setPosition(JsonUtils.toJSONString(record.getPosition()));
+
+                Metadata metadata = 
Metadata.newBuilder().setType(ReportVerifyRequest.class.getSimpleName()).build();
+
+                Payload request = Payload.newBuilder().setMetadata(metadata)
+                    
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportVerifyRequest))))
+                        .build())
+                    .build();
+
+                requestObserver.onNext(request);
+            } catch (Exception e) {
+                log.error("Failed to report verify request", e);
+            }
+        });
+    }
 
-        requestObserver.onNext(request);
+    private void reportJobRequest(String jobId, JobState jobState) throws 
InterruptedException {
+        ReportJobRequest reportJobRequest = new ReportJobRequest();
+        reportJobRequest.setJobID(jobId);
+        reportJobRequest.setState(jobState);
+        Metadata metadata = Metadata.newBuilder()
+                .setType(ReportJobRequest.class.getSimpleName())
+                .build();
+        Payload payload = Payload.newBuilder()
+                .setMetadata(metadata)
+                
.setBody(Any.newBuilder().setValue(UnsafeByteOperations.unsafeWrap(Objects.requireNonNull(JsonUtils.toJSONBytes(reportJobRequest))))
+                        .build())
+                .build();
+        requestObserver.onNext(payload);
     }
 
     private String md5(String input) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to