This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 9701f0266 [ISSUE #5079] Enhancement update for admin-server (#5080)
9701f0266 is described below

commit 9701f02660ba04ff37bd0b5787b614a3c91d8bfd
Author: mike_xwm <[email protected]>
AuthorDate: Wed Aug 21 11:26:23 2024 +0800

    [ISSUE #5079] Enhancement update for admin-server (#5080)
    
    * [ISSUE #5079] Enhancement update for admin-server
    
    * fix check style error
    
    * fix check style error
---
 eventmesh-admin-server/bin/start-admin.sh          | 51 ++++++++-------
 eventmesh-admin-server/build.gradle                |  2 +
 eventmesh-admin-server/conf/application.yaml       |  8 +--
 eventmesh-admin-server/conf/eventmesh.sql          |  2 +-
 .../conf/mapper/EventMeshVerifyMapper.xml          |  5 +-
 .../eventmesh/admin/server/web/HttpServer.java     | 23 +++++++
 .../admin/server/web/db/DBThreadPool.java          | 26 +++++++-
 .../server/web/db/entity/EventMeshVerify.java      |  3 +
 .../web/handler/impl/FetchJobRequestHandler.java   |  2 +-
 ...fyHandler.java => ReportJobRequestHandler.java} | 30 ++++++---
 .../web/handler/impl/ReportPositionHandler.java    |  2 +
 .../web/handler/impl/ReportVerifyHandler.java      | 49 +++++++++++++-
 .../server/web/service/job/JobInfoBizService.java  | 74 +++++++++++++++++++++-
 .../service/position/impl/HttpPositionHandler.java | 61 ++++++++++++++++++
 .../web/service/verify/VerifyBizService.java       |  2 +
 .../apache/eventmesh/common/remote/JobState.java   | 52 +++++++++++++++
 .../eventmesh/common/remote/TransportType.java     |  1 +
 ...ortVerifyRequest.java => ReportJobRequest.java} | 15 ++---
 .../common/remote/request/ReportVerifyRequest.java |  2 +
 ...apache.eventmesh.common.remote.payload.IPayload |  1 +
 .../offsetmgmt/admin/AdminOffsetService.java       |  3 +
 21 files changed, 353 insertions(+), 61 deletions(-)

diff --git a/eventmesh-admin-server/bin/start-admin.sh 
b/eventmesh-admin-server/bin/start-admin.sh
index 93c364439..163303661 100644
--- a/eventmesh-admin-server/bin/start-admin.sh
+++ b/eventmesh-admin-server/bin/start-admin.sh
@@ -56,34 +56,34 @@ function extract_java_version {
 #}
 
 function get_pid {
-       local ppid=""
-       if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
-               ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
-               # If the process does not exist, it indicates that the previous 
process terminated abnormally.
+        local ppid=""
+        if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
+                ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
+                # If the process does not exist, it indicates that the 
previous process terminated abnormally.
     if [ ! -d /proc/$ppid ]; then
       # Remove the residual file.
       rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
       echo -e "ERROR\t EventMesh process had already terminated unexpectedly 
before, please check log output."
       ppid=""
     fi
-       else
-               if [[ $OS =~ Msys ]]; then
-                       # There is a Bug on Msys that may not be able to kill 
the identified process
-                       ppid=`jps -v | grep -i 
"org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v 
grep | awk -F ' ' {'print $1'}`
-               elif [[ $OS =~ Darwin ]]; then
-                       # Known problem: grep Java may not be able to 
accurately identify Java processes
-                       ppid=$(/bin/ps -o user,pid,command | grep "java" | grep 
-i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" 
|awk -F ' ' {'print $2'})
-               else
-                 if [ $DOCKER ]; then
-                   # No need to exclude root user in Docker containers.
-                   ppid=$(ps -C java -o user,pid,command --cols 99999 | grep 
-w $EVENTMESH_ADMIN_HOME | grep -i 
"org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print 
$2'})
-                 else
+        else
+                if [[ $OS =~ Msys ]]; then
+                        # There is a Bug on Msys that may not be able to kill 
the identified process
+                        ppid=`jps -v | grep -i 
"org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v 
grep | awk -F ' ' {'print $1'}`
+                elif [[ $OS =~ Darwin ]]; then
+                        # Known problem: grep Java may not be able to 
accurately identify Java processes
+                        ppid=$(/bin/ps -o user,pid,command | grep "java" | 
grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev 
"^root" |awk -F ' ' {'print $2'})
+                else
+                  if [ $DOCKER ]; then
+                    # No need to exclude root user in Docker containers.
+                    ppid=$(ps -C java -o user,pid,command --cols 99999 | grep 
-w $EVENTMESH_ADMIN_HOME | grep -i 
"org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print 
$2'})
+                  else
         # It is required to identify the process as accurately as possible on 
Linux.
         ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w 
$EVENTMESH_ADMIN_HOME | grep -i 
"org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk 
-F ' ' {'print $2'})
       fi
-               fi
-       fi
-       echo "$ppid";
+                fi
+        fi
+        echo "$ppid";
 }
 
 
#===========================================================================================
@@ -136,8 +136,7 @@ export JAVA_HOME
 
 GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log"
 
-#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m 
-XX:SurvivorRatio=4"
-JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep 
APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}`
+JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"
 JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m 
-XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 
-XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50"
 JAVA_OPT="${JAVA_OPT} -verbose:gc"
 if [[ "$JAVA_VERSION" == "8" ]]; then
@@ -172,7 +171,7 @@ JAVA_OPT="${JAVA_OPT} 
-DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin"
 #            echo "proxy is running already"
 #            exit 9;
 #        else
-#          echo "err pid$pid, rm pid.file"
+#           echo "err pid$pid, rm pid.file"
 #            rm pid.file
 #        fi
 #fi
@@ -183,8 +182,8 @@ if [[ $pid == "ERROR"* ]]; then
   exit 9
 fi
 if [ -n "$pid" ]; then
-       echo -e "ERROR\t The server is already running (pid=$pid), there is no 
need to execute start.sh again."
-       exit 9
+        echo -e "ERROR\t The server is already running (pid=$pid), there is no 
need to execute start.sh again."
+        exit 9
 fi
 
 make_logs_dir
@@ -193,9 +192,9 @@ echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> 
${EVENTMESH_ADMIN_LOG_H
 
 EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer
 if [ $DOCKER ]; then
-       $JAVA $JAVA_OPT -classpath 
${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/*
 $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
+        $JAVA $JAVA_OPT -classpath 
${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/*
 $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
 else
-       $JAVA $JAVA_OPT -classpath 
${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/*
 $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
+        $JAVA $JAVA_OPT -classpath 
${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/*
 $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
 echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
 fi
 exit 0
diff --git a/eventmesh-admin-server/build.gradle 
b/eventmesh-admin-server/build.gradle
index 1fec2c7c5..95c8fa137 100644
--- a/eventmesh-admin-server/build.gradle
+++ b/eventmesh-admin-server/build.gradle
@@ -38,6 +38,8 @@ dependencies {
        implementation "com.alibaba:druid-spring-boot-starter"
     compileOnly 'com.mysql:mysql-connector-j'
        compileOnly 'org.projectlombok:lombok'
+    testImplementation 'junit:junit:4.12'
+    testImplementation 'org.projectlombok:lombok'
        annotationProcessor 'org.projectlombok:lombok'
 }
 
diff --git a/eventmesh-admin-server/conf/application.yaml 
b/eventmesh-admin-server/conf/application.yaml
index 274196db6..3d702e579 100644
--- a/eventmesh-admin-server/conf/application.yaml
+++ b/eventmesh-admin-server/conf/application.yaml
@@ -35,8 +35,8 @@ event-mesh:
     # grpc server port
     port: 8081
     adminServerList:
-      region1:
+      R1:
         - http://localhost:8082
-      region2:
-        - http://localhost:8083
-    region: region1
\ No newline at end of file
+      R2:
+        - http://localhost:8082
+    region: R1
\ No newline at end of file
diff --git a/eventmesh-admin-server/conf/eventmesh.sql 
b/eventmesh-admin-server/conf/eventmesh.sql
index 986320570..6e28daca8 100644
--- a/eventmesh-admin-server/conf/eventmesh.sql
+++ b/eventmesh-admin-server/conf/eventmesh.sql
@@ -102,7 +102,6 @@ CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` (
   `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP,
   `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
   PRIMARY KEY (`id`),
-  UNIQUE KEY `runtimeAddr` (`runtimeAddr`),
   KEY `jobID` (`jobID`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
 
@@ -137,6 +136,7 @@ CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
 CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
   `id` int unsigned NOT NULL AUTO_INCREMENT,
   `taskID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
+  `jobID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
   `recordID` varchar(50) COLLATE utf8_bin DEFAULT NULL,
   `recordSig` varchar(50) COLLATE utf8_bin DEFAULT NULL,
   `connectorName` varchar(200) COLLATE utf8_bin DEFAULT NULL,
diff --git a/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml 
b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
index b7b042145..45727498c 100644
--- a/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
+++ b/eventmesh-admin-server/conf/mapper/EventMeshVerifyMapper.xml
@@ -26,6 +26,7 @@
     <resultMap id="BaseResultMap" 
type="org.apache.eventmesh.admin.server.web.db.entity.EventMeshVerify">
             <id property="id" column="id" jdbcType="INTEGER"/>
             <result property="taskID" column="taskID" jdbcType="VARCHAR"/>
+            <result property="jobID" column="jobID" jdbcType="VARCHAR"/>
             <result property="recordID" column="recordID" jdbcType="VARCHAR"/>
             <result property="recordSig" column="recordSig" 
jdbcType="VARCHAR"/>
             <result property="connectorName" column="connectorName" 
jdbcType="VARCHAR"/>
@@ -35,8 +36,8 @@
     </resultMap>
 
     <sql id="Base_Column_List">
-        id,taskID,recordID,
-        recordSig,connectorName,connectorStage,
+        id,taskID,jobID,recordID,
+        recordSig,connectorName,connectorStage,
         position,createTime
     </sql>
 </mapper>
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
index 12afb3a3d..2454e9f02 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/HttpServer.java
@@ -18,7 +18,9 @@
 package org.apache.eventmesh.admin.server.web;
 
 import org.apache.eventmesh.admin.server.web.service.task.TaskBizService;
+import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService;
 import org.apache.eventmesh.common.remote.request.CreateTaskRequest;
+import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
 import org.apache.eventmesh.common.remote.response.CreateTaskResponse;
 import org.apache.eventmesh.common.utils.JsonUtils;
 
@@ -29,19 +31,40 @@ import 
org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
 
+import lombok.extern.slf4j.Slf4j;
+
 @RestController
 @RequestMapping("/eventmesh/admin")
+@Slf4j
 public class HttpServer {
 
     @Autowired
     private TaskBizService taskService;
 
+    @Autowired
+    private VerifyBizService verifyService;
+
     @RequestMapping(value = "/createTask", method = RequestMethod.POST)
     public ResponseEntity<Object> createOrUpdateTask(@RequestBody 
CreateTaskRequest task) {
+        log.info("receive http proto create task:{}", task);
         CreateTaskResponse createTaskResponse = taskService.createTask(task);
+        log.info("receive http proto create task result:{}", 
createTaskResponse);
         return 
ResponseEntity.ok(JsonUtils.toJSONString(Response.success(createTaskResponse)));
     }
 
+
+    @RequestMapping(value = "/reportVerify", method = RequestMethod.POST)
+    public ResponseEntity<Object> reportVerify(@RequestBody 
ReportVerifyRequest request) {
+        log.info("receive http proto report verify request:{}", request);
+        boolean result = verifyService.reportVerifyRecord(request);
+        log.info("receive http proto report verify result:{}", result);
+        if (result) {
+            return ResponseEntity.ok("report verify success.request:" + 
JsonUtils.toJSONString(request));
+        } else {
+            return ResponseEntity.internalServerError().body("report verify 
success.request:" + JsonUtils.toJSONString(request));
+        }
+    }
+
     public boolean deleteTask(Long id) {
         return false;
     }
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java
index f1de76496..277ea6665 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/DBThreadPool.java
@@ -20,6 +20,7 @@ package org.apache.eventmesh.admin.server.web.db;
 import org.apache.eventmesh.common.EventMeshThreadFactory;
 
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -39,17 +40,34 @@ public class DBThreadPool {
             new LinkedBlockingQueue<>(1000), new 
EventMeshThreadFactory("admin-server-db"),
             new ThreadPoolExecutor.DiscardOldestPolicy());
 
+
+    private final ScheduledThreadPoolExecutor checkScheduledExecutor =
+        new 
ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new 
EventMeshThreadFactory("admin-server-check-scheduled"),
+            new ThreadPoolExecutor.DiscardOldestPolicy());
+
     @PreDestroy
     private void destroy() {
         if (!executor.isShutdown()) {
             try {
                 executor.shutdown();
                 if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
-                    log.info("wait heart beat handler thread pool shutdown 
timeout, it will shutdown immediately");
+                    log.info("wait handler thread pool shutdown timeout, it 
will shutdown immediately");
                     executor.shutdownNow();
                 }
             } catch (InterruptedException e) {
-                log.warn("wait heart beat handler thread pool shutdown fail");
+                log.warn("wait handler thread pool shutdown fail");
+            }
+        }
+
+        if (!checkScheduledExecutor.isShutdown()) {
+            try {
+                checkScheduledExecutor.shutdown();
+                if (!checkScheduledExecutor.awaitTermination(30, 
TimeUnit.SECONDS)) {
+                    log.info("wait scheduled thread pool shutdown timeout, it 
will shutdown immediately");
+                    checkScheduledExecutor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                log.warn("wait scheduled thread pool shutdown fail");
             }
         }
     }
@@ -57,4 +75,8 @@ public class DBThreadPool {
     public ThreadPoolExecutor getExecutors() {
         return executor;
     }
+
+    public ScheduledThreadPoolExecutor getCheckExecutor() {
+        return checkScheduledExecutor;
+    }
 }
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java
index 5425c5c57..c5a6c35f8 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshVerify.java
@@ -32,11 +32,14 @@ import lombok.Data;
 @TableName(value = "event_mesh_verify")
 @Data
 public class EventMeshVerify implements Serializable {
+
     @TableId(type = IdType.AUTO)
     private Integer id;
 
     private String taskID;
 
+    private String jobID;
+
     private String recordID;
 
     private String recordSig;
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
index b377bcddd..3392084c2 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
@@ -56,7 +56,7 @@ public class FetchJobRequestHandler extends 
BaseRequestHandler<FetchJobRequest,
         
config.setSourceConnectorConfig(JsonUtils.objectToMap(detail.getSourceDataSource().getConf()));
         config.setSourceConnectorDesc(detail.getSourceConnectorDesc());
         
config.setSinkConnectorConfig(JsonUtils.objectToMap(detail.getSinkDataSource().getConf()));
-        config.setSourceConnectorDesc(detail.getSinkConnectorDesc());
+        config.setSinkConnectorDesc(detail.getSinkConnectorDesc());
         response.setConnectorConfig(config);
         response.setTransportType(detail.getTransportType());
         response.setState(detail.getState());
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java
similarity index 54%
copy from 
eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
copy to 
eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java
index 39963494c..ea836ce7a 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportJobRequestHandler.java
@@ -17,11 +17,12 @@
 
 package org.apache.eventmesh.admin.server.web.handler.impl;
 
+import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
 import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
-import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService;
+import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
 import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
 import org.apache.eventmesh.common.remote.exception.ErrorCode;
-import org.apache.eventmesh.common.remote.request.ReportVerifyRequest;
+import org.apache.eventmesh.common.remote.request.ReportJobRequest;
 import org.apache.eventmesh.common.remote.response.SimpleResponse;
 
 import org.apache.commons.lang3.StringUtils;
@@ -33,17 +34,26 @@ import lombok.extern.slf4j.Slf4j;
 
 @Component
 @Slf4j
-public class ReportVerifyHandler extends 
BaseRequestHandler<ReportVerifyRequest, SimpleResponse> {
+public class ReportJobRequestHandler extends 
BaseRequestHandler<ReportJobRequest, SimpleResponse> {
+
     @Autowired
-    private VerifyBizService verifyService;
+    JobInfoBizService jobInfoBizService;
 
     @Override
-    protected SimpleResponse handler(ReportVerifyRequest request, Metadata 
metadata) {
-        if (StringUtils.isAnyBlank(request.getTaskID(), 
request.getRecordSig(), request.getRecordID(), request.getConnectorStage())) {
-            log.info("report verify request [{}] illegal", request);
-            return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task 
id, sign, record id or stage is none");
+    public SimpleResponse handler(ReportJobRequest request, Metadata metadata) 
{
+        log.info("receive report job request:{}", request);
+        if (StringUtils.isBlank(request.getJobID())) {
+            return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, 
it's empty");
+        }
+        EventMeshJobInfo jobInfo = 
jobInfoBizService.getJobInfo(request.getJobID());
+        if (jobInfo == null) {
+            return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, 
not exist target job,jobID:" + request.getJobID());
+        }
+        boolean result = jobInfoBizService.updateJobState(jobInfo.getJobID(), 
request.getState());
+        if (result) {
+            return SimpleResponse.success();
+        } else {
+            return SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "update job 
failed.");
         }
-        return verifyService.reportVerifyRecord(request) ? 
SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save 
verify "
-                + "request fail");
     }
 }
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
index 5e2a96826..7a30bef80 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
@@ -37,6 +37,7 @@ import lombok.extern.slf4j.Slf4j;
 @Component
 @Slf4j
 public class ReportPositionHandler extends 
BaseRequestHandler<ReportPositionRequest, SimpleResponse> {
+
     @Autowired
     private JobInfoBizService jobInfoBizService;
 
@@ -48,6 +49,7 @@ public class ReportPositionHandler extends 
BaseRequestHandler<ReportPositionRequ
 
     @Override
     protected SimpleResponse handler(ReportPositionRequest request, Metadata 
metadata) {
+        log.info("receive report position request:{}", request);
         if (StringUtils.isBlank(request.getJobID())) {
             log.info("request [{}] illegal job id", request);
             return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "illegal job id, 
it's empty");
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
index 39963494c..9844f47c6 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/handler/impl/ReportVerifyHandler.java
@@ -17,7 +17,10 @@
 
 package org.apache.eventmesh.admin.server.web.handler.impl;
 
+import org.apache.eventmesh.admin.server.AdminServerProperties;
+import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
 import org.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
+import org.apache.eventmesh.admin.server.web.service.job.JobInfoBizService;
 import org.apache.eventmesh.admin.server.web.service.verify.VerifyBizService;
 import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
 import org.apache.eventmesh.common.remote.exception.ErrorCode;
@@ -26,24 +29,64 @@ import 
org.apache.eventmesh.common.remote.response.SimpleResponse;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.util.List;
+import java.util.Random;
+
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
 
 import lombok.extern.slf4j.Slf4j;
 
 @Component
 @Slf4j
 public class ReportVerifyHandler extends 
BaseRequestHandler<ReportVerifyRequest, SimpleResponse> {
+
     @Autowired
     private VerifyBizService verifyService;
 
+    @Autowired
+    JobInfoBizService jobInfoBizService;
+
+    @Autowired
+    private AdminServerProperties properties;
+
     @Override
     protected SimpleResponse handler(ReportVerifyRequest request, Metadata 
metadata) {
-        if (StringUtils.isAnyBlank(request.getTaskID(), 
request.getRecordSig(), request.getRecordID(), request.getConnectorStage())) {
+        if (StringUtils.isAnyBlank(request.getTaskID(), request.getJobID(), 
request.getRecordSig(), request.getRecordID(),
+            request.getConnectorStage())) {
             log.info("report verify request [{}] illegal", request);
-            return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task 
id, sign, record id or stage is none");
+            return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "request task 
id,job id, sign, record id or stage is none");
+        }
+
+        String jobID = request.getJobID();
+        EventMeshJobInfo jobInfo = jobInfoBizService.getJobInfo(jobID);
+        if (jobInfo == null || StringUtils.isBlank(jobInfo.getFromRegion())) {
+            log.info("report verify job info [{}] illegal", request);
+            return SimpleResponse.fail(ErrorCode.BAD_REQUEST, "job info is 
null or fromRegion is blank,job id:" + jobID);
         }
-        return verifyService.reportVerifyRecord(request) ? 
SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save 
verify "
+
+        String fromRegion = jobInfo.getFromRegion();
+        String localRegion = properties.getRegion();
+        log.info("report verify request from 
region:{},localRegion:{},request:{}", fromRegion, localRegion, request);
+        if (fromRegion.equalsIgnoreCase(localRegion)) {
+            return verifyService.reportVerifyRecord(request) ? 
SimpleResponse.success() : SimpleResponse.fail(ErrorCode.INTERNAL_ERR, "save 
verify "
                 + "request fail");
+        } else {
+            log.info("start transfer report verify to from region admin 
server. from region:{}", fromRegion);
+            List<String> adminServerList = 
properties.getAdminServerList().get(fromRegion);
+            if (adminServerList == null || adminServerList.isEmpty()) {
+                throw new RuntimeException("No admin server available for 
region: " + fromRegion);
+            }
+            String targetUrl = adminServerList.get(new 
Random().nextInt(adminServerList.size())) + "/eventmesh/admin/reportVerify";
+            RestTemplate restTemplate = new RestTemplate();
+            ResponseEntity<String> response = 
restTemplate.postForEntity(targetUrl, request, String.class);
+            if (!response.getStatusCode().is2xxSuccessful()) {
+                return SimpleResponse.fail(ErrorCode.INTERNAL_ERR,
+                    "save verify request fail,code:" + 
response.getStatusCode() + ",msg:" + response.getBody());
+            }
+            return SimpleResponse.success();
+        }
     }
 }
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
index 0657383e2..a8b469d8b 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java
@@ -19,15 +19,19 @@ package org.apache.eventmesh.admin.server.web.service.job;
 
 import org.apache.eventmesh.admin.server.AdminServerProperties;
 import org.apache.eventmesh.admin.server.AdminServerRuntimeException;
+import org.apache.eventmesh.admin.server.web.db.DBThreadPool;
 import org.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource;
 import org.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
+import 
org.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat;
 import 
org.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService;
 import 
org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoExtService;
 import 
org.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService;
+import 
org.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService;
 import org.apache.eventmesh.admin.server.web.pojo.JobDetail;
 import 
org.apache.eventmesh.admin.server.web.service.datasource.DataSourceBizService;
 import 
org.apache.eventmesh.admin.server.web.service.position.PositionBizService;
 import org.apache.eventmesh.common.config.connector.Config;
+import org.apache.eventmesh.common.remote.JobState;
 import org.apache.eventmesh.common.remote.TaskState;
 import org.apache.eventmesh.common.remote.TransportType;
 import org.apache.eventmesh.common.remote.datasource.DataSource;
@@ -38,9 +42,13 @@ import org.apache.eventmesh.common.utils.JsonUtils;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.time.Duration;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -75,13 +83,41 @@ public class JobInfoBizService {
     @Autowired
     private AdminServerProperties properties;
 
+    @Autowired
+    EventMeshRuntimeHeartbeatService heartbeatService;
+
+    private final long heatBeatPeriod = Duration.ofMillis(5000).toMillis();
+
+    @Autowired
+    DBThreadPool executor;
+
+    @PostConstruct
+    public void init() {
+        log.info("init check job info scheduled task.");
+        executor.getCheckExecutor().scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                checkJobInfo();
+            }
+        }, 10, 10, TimeUnit.SECONDS);
+    }
+
     public boolean updateJobState(String jobID, TaskState state) {
         if (jobID == null || state == null) {
             return false;
         }
         EventMeshJobInfo jobInfo = new EventMeshJobInfo();
         jobInfo.setJobState(state.name());
-        return jobInfoService.update(jobInfo, 
Wrappers.<EventMeshJobInfo>update().eq("jobID", jobID).ne("state", 
TaskState.DELETE.name()));
+        return jobInfoService.update(jobInfo, 
Wrappers.<EventMeshJobInfo>update().eq("jobID", jobID).ne("jobState", 
JobState.DELETE.name()));
+    }
+
+    public boolean updateJobState(String jobID, JobState state) {
+        if (jobID == null || state == null) {
+            return false;
+        }
+        EventMeshJobInfo jobInfo = new EventMeshJobInfo();
+        jobInfo.setJobState(state.name());
+        return jobInfoService.update(jobInfo, 
Wrappers.<EventMeshJobInfo>update().eq("jobID", jobID).ne("jobState", 
JobState.DELETE.name()));
     }
 
     @Transactional
@@ -114,7 +150,8 @@ public class JobInfoBizService {
             source.setOperator(job.getCreateUid());
             source.setRegion(job.getSourceDataSource().getRegion());
             source.setDesc(job.getSourceConnectorDesc());
-            source.setConfig(job.getSourceDataSource().getConf());
+            Config sourceConfig = job.getSourceDataSource().getConf();
+            source.setConfig(sourceConfig);
             
source.setConfigClass(job.getSourceDataSource().getConfClazz().getName());
             EventMeshDataSource createdSource = 
dataSourceBizService.createDataSource(source);
             entity.setSourceData(createdSource.getId());
@@ -124,7 +161,8 @@ public class JobInfoBizService {
             sink.setOperator(job.getCreateUid());
             sink.setRegion(job.getSinkDataSource().getRegion());
             sink.setDesc(job.getSinkConnectorDesc());
-            sink.setConfig(job.getSinkDataSource().getConf());
+            Config sinkConfig = job.getSinkDataSource().getConf();
+            sink.setConfig(sinkConfig);
             
sink.setConfigClass(job.getSinkDataSource().getConfClazz().getName());
             EventMeshDataSource createdSink = 
dataSourceBizService.createDataSource(sink);
             entity.setTargetData(createdSink.getId());
@@ -195,6 +233,36 @@ public class JobInfoBizService {
         
detail.setTransportType(TransportType.getTransportType(job.getTransportType()));
         return detail;
     }
+
+    public EventMeshJobInfo getJobInfo(String jobID) {
+        if (jobID == null) {
+            return null;
+        }
+        EventMeshJobInfo job = 
jobInfoService.getOne(Wrappers.<EventMeshJobInfo>query().eq("jobID", jobID));
+        return job;
+    }
+
+    public void checkJobInfo() {
+        List<EventMeshJobInfo> eventMeshJobInfoList = 
jobInfoService.list(Wrappers.<EventMeshJobInfo>query().eq("jobState", 
JobState.RUNNING.name()));
+        log.info("start check job info.to check job size:{}", 
eventMeshJobInfoList.size());
+        for (EventMeshJobInfo jobInfo : eventMeshJobInfoList) {
+            String jobID = jobInfo.getJobID();
+            if (StringUtils.isEmpty(jobID)) {
+                continue;
+            }
+            EventMeshRuntimeHeartbeat heartbeat = 
heartbeatService.getOne(Wrappers.<EventMeshRuntimeHeartbeat>query().eq("jobID", 
jobID));
+            if (heartbeat == null) {
+                continue;
+            }
+            // if last heart beat update time have delay three period.print 
job heart beat delay warn
+            long currentTimeStamp = System.currentTimeMillis();
+            if (currentTimeStamp - heartbeat.getUpdateTime().getTime() > 3 * 
heatBeatPeriod) {
+                log.warn("current job heart heart has 
delay.jobID:{},currentTimeStamp:{},last update time:{}", jobID, 
currentTimeStamp,
+                    heartbeat.getUpdateTime());
+            }
+        }
+    }
+
 }
 
 
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java
new file mode 100644
index 000000000..b8d536f38
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/HttpPositionHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.admin.server.web.service.position.impl;
+
+import 
org.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService;
+import org.apache.eventmesh.admin.server.web.service.position.PositionHandler;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.remote.datasource.DataSourceType;
+import org.apache.eventmesh.common.remote.offset.RecordPosition;
+import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
+import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Component
+@Slf4j
+public class HttpPositionHandler extends PositionHandler {
+
+    @Autowired
+    EventMeshPositionReporterHistoryService historyService;
+
+    @Override
+    protected DataSourceType getSourceType() {
+        return DataSourceType.HTTP;
+    }
+
+    @Override
+    public boolean handler(ReportPositionRequest request, Metadata metadata) {
+        log.info("receive http position report request:{}", request);
+        // mock wemq postion report store
+        return true;
+    }
+
+    @Override
+    public List<RecordPosition> handler(FetchPositionRequest request, Metadata 
metadata) {
+        // mock http position fetch request
+        List<RecordPosition> recordPositionList = new ArrayList<>();
+        return recordPositionList;
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java
 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java
index 74f208b19..e4f08b30c 100644
--- 
a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java
+++ 
b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/verify/VerifyBizService.java
@@ -26,6 +26,7 @@ import org.springframework.stereotype.Service;
 
 @Service
 public class VerifyBizService {
+
     @Autowired
     private EventMeshVerifyService verifyService;
 
@@ -35,6 +36,7 @@ public class VerifyBizService {
         verify.setRecordSig(request.getRecordSig());
         verify.setPosition(request.getPosition());
         verify.setTaskID(request.getTaskID());
+        verify.setJobID(request.getJobID());
         verify.setConnectorName(request.getConnectorName());
         verify.setConnectorStage(request.getConnectorStage());
         return verifyService.save(verify);
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java
new file mode 100644
index 000000000..da9daffe9
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/JobState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.remote;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.ToString;
+
+@ToString
+public enum JobState {
+    INIT, RUNNING, COMPLETE, DELETE, FAIL;
+    private static final JobState[] STATES_NUM_INDEX = JobState.values();
+    private static final Map<String, JobState> STATES_NAME_INDEX = new 
HashMap<>();
+
+    static {
+        for (JobState jobState : STATES_NUM_INDEX) {
+            STATES_NAME_INDEX.put(jobState.name(), jobState);
+        }
+    }
+
+    public static JobState fromIndex(Integer index) {
+        if (index == null || index < 0 || index >= STATES_NUM_INDEX.length) {
+            return null;
+        }
+
+        return STATES_NUM_INDEX[index];
+    }
+
+    public static JobState fromIndex(String index) {
+        if (index == null || index.isEmpty()) {
+            return null;
+        }
+
+        return STATES_NAME_INDEX.get(index);
+    }
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
index 82e7bc021..6b4359839 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/TransportType.java
@@ -35,6 +35,7 @@ public enum TransportType {
     HTTP_REDIS(DataSourceType.HTTP, DataSourceType.REDIS),
     HTTP_ROCKETMQ(DataSourceType.HTTP, DataSourceType.ROCKETMQ),
     REDIS_MQ(DataSourceType.REDIS, DataSourceType.ROCKETMQ),
+    HTTP_HTTP(DataSourceType.HTTP, DataSourceType.HTTP),
     ;
     private static final Map<String, TransportType> INDEX_TYPES = new 
HashMap<>();
     private static final TransportType[] TYPES = TransportType.values();
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportJobRequest.java
similarity index 80%
copy from 
eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
copy to 
eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportJobRequest.java
index cd541949f..aec33e461 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportJobRequest.java
@@ -17,6 +17,8 @@
 
 package org.apache.eventmesh.common.remote.request;
 
+import org.apache.eventmesh.common.remote.JobState;
+
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
@@ -24,17 +26,12 @@ import lombok.ToString;
 @Data
 @EqualsAndHashCode(callSuper = true)
 @ToString
-public class ReportVerifyRequest extends BaseRemoteRequest {
-
-    private String taskID;
-
-    private String recordID;
+public class ReportJobRequest extends BaseRemoteRequest {
 
-    private String recordSig;
+    private String jobID;
 
-    private String connectorName;
+    private JobState state;
 
-    private String connectorStage;
+    private String address;
 
-    private String position;
 }
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
index cd541949f..bd38881c3 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/request/ReportVerifyRequest.java
@@ -28,6 +28,8 @@ public class ReportVerifyRequest extends BaseRemoteRequest {
 
     private String taskID;
 
+    private String jobID;
+
     private String recordID;
 
     private String recordSig;
diff --git 
a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
 
b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
index 82d5c94dd..433cf57ed 100644
--- 
a/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
+++ 
b/eventmesh-common/src/main/resources/META-INF/services/org.apache.eventmesh.common.remote.payload.IPayload
@@ -16,6 +16,7 @@
 org.apache.eventmesh.common.remote.request.FetchJobRequest
 org.apache.eventmesh.common.remote.response.FetchJobResponse
 org.apache.eventmesh.common.remote.request.ReportPositionRequest
+org.apache.eventmesh.common.remote.request.ReportJobRequest
 org.apache.eventmesh.common.remote.request.ReportVerifyRequest
 org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest
 org.apache.eventmesh.common.remote.request.FetchPositionRequest
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
index 977661b13..993352a97 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-offsetmgmt-plugin/eventmesh-openconnect-offsetmgmt-admin/src/main/java/org/apache/eventmesh/openconnect/offsetmgmt/admin/AdminOffsetService.java
@@ -112,6 +112,8 @@ public class AdminOffsetService implements 
OffsetManagementService {
 
         reportPositionRequest.setRecordPositionList(recordToSyncList);
 
+        log.debug("start report position request: {}", 
JsonUtils.toJSONString(reportPositionRequest));
+
         Metadata metadata = Metadata.newBuilder()
             .setType(ReportPositionRequest.class.getSimpleName())
             .build();
@@ -121,6 +123,7 @@ public class AdminOffsetService implements 
OffsetManagementService {
                 .build())
             .build();
         requestObserver.onNext(payload);
+        log.debug("end report position request: {}", 
JsonUtils.toJSONString(reportPositionRequest));
 
         for (Map.Entry<RecordPartition, RecordOffset> entry : 
recordMap.entrySet()) {
             positionStore.remove(entry.getKey());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to