This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch eventmesh-function
in repository https://gitbox.apache.org/repos/asf/eventmesh.git

commit 3f13070ddd23f9a6001a6ae97d3291d37be3f518
Author: sodaRyCN <[email protected]>
AuthorDate: Wed May 29 10:53:18 2024 +0800

    [ISSUES #4933]Add Admin Module
---
 .../com/apache/eventmesh/admin/server/Admin.java   |  11 +-
 .../eventmesh/admin/server/AdminException.java     |  11 -
 .../apache/eventmesh/admin/server/AdminServer.java |  71 +++++-
 .../admin/server/AdminServerProperties.java        |  16 ++
 .../admin/server/AdminServerRuntimeException.java  |  12 +
 .../eventmesh/admin/server/ComponentLifeCycle.java |   2 +-
 .../server/EventMeshAdminServerConfiguration.java  |  32 ---
 .../eventmesh/admin/server/ExampleAdminServer.java |  14 ++
 .../apache/eventmesh/admin/server/HeartBeat.java   |  14 --
 .../server/constatns/AdminServerConstants.java     |   9 +
 .../apache/eventmesh/admin/server/task/Job.java    |   8 -
 .../eventmesh/admin/server/task/JobState.java      |  10 -
 .../eventmesh/admin/server/task/JobType.java       |   7 -
 .../eventmesh/admin/server/task/Position.java      |   5 -
 .../apache/eventmesh/admin/server/task/Task.java   |  17 --
 .../admin/server/web/AdminGrpcServer.java          |  85 +++++++
 .../eventmesh/admin/server/web/BaseServer.java     |  30 +++
 .../eventmesh/admin/server/web/GrpcServer.java     |  44 +++-
 .../admin/server/web/db/DBThreadPool.java          |  38 +++
 .../server/web/db/entity/EventMeshDataSource.java  |  35 +++
 .../server/web/db/entity/EventMeshJobDetail.java   |  29 +++
 .../server/web/db/entity/EventMeshJobInfo.java     |  41 +++
 .../web/db/entity/EventMeshMysqlPosition.java      |  35 +++
 .../entity/EventMeshPositionReporterHistory.java   |  29 +++
 .../web/db/entity/EventMeshRuntimeHeartbeat.java   |  33 +++
 .../web/db/entity/EventMeshRuntimeHistory.java     |  27 ++
 .../web/db/mapper/EventMeshDataSourceMapper.java   |  20 ++
 .../web/db/mapper/EventMeshJobInfoMapper.java      |  20 ++
 .../db/mapper/EventMeshMysqlPositionMapper.java    |  20 ++
 .../EventMeshPositionReporterHistoryMapper.java    |  20 ++
 .../db/mapper/EventMeshRuntimeHeartbeatMapper.java |  20 ++
 .../db/mapper/EventMeshRuntimeHistoryMapper.java   |  20 ++
 .../web/db/service/EventMeshDataSourceService.java |  13 +
 .../web/db/service/EventMeshJobInfoService.java    |  12 +
 .../db/service/EventMeshMysqlPositionService.java  |  12 +
 .../EventMeshPositionReporterHistoryService.java   |  13 +
 .../service/EventMeshRuntimeHeartbeatService.java  |  12 +
 .../db/service/EventMeshRuntimeHistoryService.java |  13 +
 .../impl/EventMeshDataSourceServiceImpl.java       |  22 ++
 .../service/impl/EventMeshJobInfoServiceImpl.java  |  23 ++
 .../impl/EventMeshMysqlPositionServiceImpl.java    |  23 ++
 ...ventMeshPositionReporterHistoryServiceImpl.java |  22 ++
 .../impl/EventMeshRuntimeHeartbeatServiceImpl.java |  23 ++
 .../impl/EventMeshRuntimeHistoryServiceImpl.java   |  22 ++
 .../web/generated/AdminBiStreamServiceGrpc.java    | 263 -------------------
 .../server/web/generated/AdminServiceGrpc.java     | 279 ---------------------
 .../server/web/handler/BaseRequestHandler.java     |  13 +
 .../server/web/handler/RequestHandlerFactory.java  |  47 ++++
 .../web/handler/impl/FetchJobRequestHandler.java   |  51 ++++
 .../web/handler/impl/FetchPositionHandler.java     |  36 +++
 .../web/handler/impl/ReportHeartBeatHandler.java   |  50 ++++
 .../web/handler/impl/ReportPositionHandler.java    |  81 ++++++
 .../EventMeshRuntimeHeartbeatBizService.java       |  63 +++++
 .../service/job/EventMeshJobInfoBizService.java    | 111 ++++++++
 .../position/EventMeshPositionBizService.java      |  61 +++++
 .../service/position/IFetchPositionHandler.java    |   9 +
 .../service/position/IReportPositionHandler.java   |   8 +
 .../web/service/position/PositionHandler.java      |   7 +
 .../service/position/PositionHandlerFactory.java   |  35 +++
 .../position/impl/MysqlPositionHandler.java        | 140 +++++++++++
 ...eventmesh.admin.server.registry.RegistryService |  16 --
 .../src/main/resources/META-INF/spring.factories   |   2 +
 .../src/main/resources/application.yaml            |  17 +-
 .../src/main/resources/eventmesh-admin.properties  |   2 +
 .../src/main/resources/eventmesh.sql               | 114 +++++++++
 .../resources/mapper/EventMeshDataSourceMapper.xml |  23 ++
 .../resources/mapper/EventMeshJobInfoMapper.xml    |  27 ++
 .../mapper/EventMeshMysqlPositionMapper.xml        |  23 ++
 .../EventMeshPositionReporterHistoryMapper.xml     |  19 ++
 .../mapper/EventMeshRuntimeHeartbeatMapper.xml     |  22 ++
 .../mapper/EventMeshRuntimeHistoryMapper.xml       |  18 ++
 71 files changed, 1841 insertions(+), 691 deletions(-)

diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java
index 1090f7b59..9be047edc 100644
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java
@@ -1,10 +1,10 @@
 package com.apache.eventmesh.admin.server;
 
+import org.apache.eventmesh.common.remote.Task;
+import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
 import org.apache.eventmesh.common.utils.PagedList;
 
-import com.apache.eventmesh.admin.server.task.Task;
-
-public interface Admin extends ComponentLifeCycle{
+public interface Admin extends ComponentLifeCycle {
     /**
      * support for web or ops
      **/
@@ -17,8 +17,5 @@ public interface Admin extends ComponentLifeCycle{
     /**
      * support for task
      */
-    void reportHeartbeat(HeartBeat heartBeat);
-
-
-
+    void reportHeartbeat(ReportHeartBeatRequest heartBeat);
 }
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminException.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminException.java
deleted file mode 100644
index eca5eeb0d..000000000
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminException.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package com.apache.eventmesh.admin.server;
-
-public class AdminException extends RuntimeException {
-    public AdminException(String message) {
-        super(message);
-    }
-
-    public AdminException(String message, Throwable cause) {
-        super(message, cause);
-    }
-}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java
index b4ab41a63..a76b4c194 100644
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java
@@ -1,18 +1,50 @@
 package com.apache.eventmesh.admin.server;
 
-import com.apache.eventmesh.admin.server.task.Task;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.config.CommonConfiguration;
+import org.apache.eventmesh.common.config.ConfigService;
+import org.apache.eventmesh.common.remote.Task;
+import org.apache.eventmesh.common.remote.exception.ErrorCode;
+import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
+import org.apache.eventmesh.common.utils.IPUtils;
 import org.apache.eventmesh.common.utils.PagedList;
+import org.apache.eventmesh.registry.RegisterServerInfo;
+import org.apache.eventmesh.registry.RegistryFactory;
 import org.apache.eventmesh.registry.RegistryService;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.stereotype.Service;
 
-public class AdminServer implements Admin {
+import javax.annotation.PostConstruct;
 
-    private RegistryService registryService;
+@Service
+@Slf4j
+public class AdminServer implements Admin, 
ApplicationListener<ApplicationReadyEvent> {
 
-//    private EventMeshAdminServerRegisterInfo registerInfo;
+    private final RegistryService registryService;
 
-    public AdminServer(RegistryService registryService) {
-        this.registryService = registryService;
-//        this.registerInfo = registerInfo;
+    private final RegisterServerInfo adminServeInfo;
+
+    private final CommonConfiguration configuration;
+
+    public AdminServer(AdminServerProperties properties) {
+        configuration =
+                
ConfigService.getInstance().buildConfigInstance(CommonConfiguration.class);
+        if (configuration == null) {
+            throw new 
AdminServerRuntimeException(ErrorCode.STARTUP_CONFIG_MISS, "common 
configuration file miss");
+        }
+        this.adminServeInfo = new RegisterServerInfo();
+
+        adminServeInfo.setHealth(true);
+        adminServeInfo.setAddress(IPUtils.getLocalAddress() + ":" + 
properties.getPort());
+        String name = Constants.ADMIN_SERVER_REGISTRY_NAME;
+        if (StringUtils.isNotBlank(properties.getServiceName())) {
+            name = properties.getServiceName();
+        }
+        adminServeInfo.setServiceName(name);
+        registryService = 
RegistryFactory.getInstance(configuration.getEventMeshRegistryPluginType());
     }
 
 
@@ -37,18 +69,35 @@ public class AdminServer implements Admin {
     }
 
     @Override
-    public void reportHeartbeat(HeartBeat heartBeat) {
+    public void reportHeartbeat(ReportHeartBeatRequest heartBeat) {
 
     }
 
     @Override
+    @PostConstruct
     public void start() {
-        registryService.register(null);
+        if (configuration.isEventMeshRegistryPluginEnabled()) {
+            registryService.init();
+        }
     }
 
     @Override
     public void destroy() {
-        registryService.unRegister(null);
-        registryService.shutdown();
+        if (configuration.isEventMeshRegistryPluginEnabled()) {
+            registryService.unRegister(adminServeInfo);
+            try {
+                Thread.sleep(3000);
+            } catch (InterruptedException ignore) {
+            }
+            registryService.shutdown();
+        }
+    }
+
+    @Override
+    public void onApplicationEvent(ApplicationReadyEvent event) {
+        if (configuration.isEventMeshRegistryPluginEnabled()) {
+            log.info("application is started and registry plugin is enabled, 
it's will register admin self");
+            registryService.register(adminServeInfo);
+        }
     }
 }
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServerProperties.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServerProperties.java
new file mode 100644
index 000000000..ec507d599
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServerProperties.java
@@ -0,0 +1,16 @@
+package com.apache.eventmesh.admin.server;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties("event-mesh.admin-server")
+@Getter
+@Setter
+public class AdminServerProperties {
+    private int port;
+    private boolean enableSSL;
+    private String configurationPath;
+    private String configurationFile;
+    private String serviceName;
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServerRuntimeException.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServerRuntimeException.java
new file mode 100644
index 000000000..0282bab39
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServerRuntimeException.java
@@ -0,0 +1,12 @@
+package com.apache.eventmesh.admin.server;
+
+import lombok.Getter;
+
+public class AdminServerRuntimeException extends RuntimeException {
+    @Getter
+    private final int code;
+    public AdminServerRuntimeException(int code, String message) {
+        super(message);
+        this.code = code;
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ComponentLifeCycle.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ComponentLifeCycle.java
index 76abd005b..cc12f1afd 100644
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ComponentLifeCycle.java
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ComponentLifeCycle.java
@@ -1,6 +1,6 @@
 package com.apache.eventmesh.admin.server;
 
 public interface ComponentLifeCycle {
-    void start();
+    void start() throws Exception;
     void destroy();
 }
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/EventMeshAdminServerConfiguration.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/EventMeshAdminServerConfiguration.java
deleted file mode 100644
index aab5b7cc7..000000000
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/EventMeshAdminServerConfiguration.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package com.apache.eventmesh.admin.server;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
-import org.apache.eventmesh.common.config.CommonConfiguration;
-import org.apache.eventmesh.common.config.Config;
-import org.apache.eventmesh.common.config.ConfigFiled;
-
-@Data
-@NoArgsConstructor
-@EqualsAndHashCode(callSuper = true)
-@Config(prefix = "eventMesh.admin")
-public class EventMeshAdminServerConfiguration extends CommonConfiguration {
-    @ConfigFiled(field = "server.http.port")
-    private int eventMeshHttpServerPort = 10000;
-
-    @ConfigFiled(field = "server.gRPC.port")
-    private int eventMeshGrpcServerPort = 10000;
-
-    @ConfigFiled(field = "registry.plugin.server-addr", notEmpty = true)
-    private String registryCenterAddr = "";
-
-    @ConfigFiled(field = "registry.plugin.type", notEmpty = true)
-    private String eventMeshRegistryPluginType = "nacos";
-
-    @ConfigFiled(field = "registry.plugin.username")
-    private String eventMeshRegistryPluginUsername = "";
-
-    @ConfigFiled(field = "registry.plugin.password")
-    private String eventMeshRegistryPluginPassword = "";
-}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ExampleAdminServer.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ExampleAdminServer.java
new file mode 100644
index 000000000..51e2e1038
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ExampleAdminServer.java
@@ -0,0 +1,14 @@
+package com.apache.eventmesh.admin.server;
+
+import com.apache.eventmesh.admin.server.constatns.AdminServerConstants;
+import org.apache.eventmesh.common.config.ConfigService;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class ExampleAdminServer {
+    public static void main(String[] args) throws Exception {
+        
ConfigService.getInstance().setConfigPath(AdminServerConstants.EVENTMESH_CONF_HOME).setRootConfig(AdminServerConstants.EVENTMESH_CONF_FILE);
+        SpringApplication.run(ExampleAdminServer.class);
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/HeartBeat.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/HeartBeat.java
deleted file mode 100644
index 568b7ff31..000000000
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/HeartBeat.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package com.apache.eventmesh.admin.server;
-
-import com.apache.eventmesh.admin.server.task.JobState;
-import com.apache.eventmesh.admin.server.task.Position;
-import lombok.Data;
-
-@Data
-public class HeartBeat {
-    private String address;
-    private String reportedTimeStamp;
-    private String jobID;
-    private Position position;
-    private JobState state;
-}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/constatns/AdminServerConstants.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/constatns/AdminServerConstants.java
new file mode 100644
index 000000000..d3c01fedc
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/constatns/AdminServerConstants.java
@@ -0,0 +1,9 @@
+package com.apache.eventmesh.admin.server.constatns;
+
+public class AdminServerConstants {
+    public static final String CONF_ENV = "configurationPath";
+
+    public static final String EVENTMESH_CONF_HOME = 
System.getProperty(CONF_ENV, System.getenv(CONF_ENV));
+
+    public static final String EVENTMESH_CONF_FILE = 
"eventmesh-admin.properties";
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Job.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Job.java
deleted file mode 100644
index 1fe5b0897..000000000
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Job.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package com.apache.eventmesh.admin.server.task;
-
-public class Job {
-    private long id;
-    private long taskID;
-    private JobType type;
-    private JobState state;
-}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobState.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobState.java
deleted file mode 100644
index 845d91c4a..000000000
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobState.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package com.apache.eventmesh.admin.server.task;
-
-public enum JobState {
-    INIT,
-    STARaTED,
-    PAUSE,
-    COMPLETE,
-    DELETE,
-    FAIL
-}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobType.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobType.java
deleted file mode 100644
index b69480398..000000000
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobType.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package com.apache.eventmesh.admin.server.task;
-
-public enum JobType {
-    FULL,
-    INCREASE,
-    STRUCT_SYNC
-}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Position.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Position.java
deleted file mode 100644
index 491f796a9..000000000
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Position.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package com.apache.eventmesh.admin.server.task;
-
-public class Position {
-
-}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Task.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Task.java
deleted file mode 100644
index 4f6cb7cfe..000000000
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Task.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package com.apache.eventmesh.admin.server.task;
-
-// task : job = 1 : m
-public class Task {
-    private long id;
-    private String name;
-    private String desc;
-    private String uid;
-    private String sourceUser;
-    private String sourcePasswd;
-    private String targetUser;
-    private String targetPasswd;
-    private int sourceType;
-    private int targetType;
-
-
-}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java
new file mode 100644
index 000000000..751ffd85b
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/AdminGrpcServer.java
@@ -0,0 +1,85 @@
+package com.apache.eventmesh.admin.server.web;
+
+import com.apache.eventmesh.admin.server.AdminServerRuntimeException;
+import com.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
+import com.apache.eventmesh.admin.server.web.handler.RequestHandlerFactory;
+import io.grpc.stub.ServerCallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.AdminServiceGrpc;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Payload;
+import org.apache.eventmesh.common.remote.exception.ErrorCode;
+import org.apache.eventmesh.common.remote.payload.PayloadUtil;
+import org.apache.eventmesh.common.remote.request.BaseRemoteRequest;
+import org.apache.eventmesh.common.remote.response.BaseRemoteResponse;
+import org.apache.eventmesh.common.remote.response.EmptyAckResponse;
+import org.apache.eventmesh.common.remote.response.FailResponse;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+@Slf4j
+public class AdminGrpcServer extends AdminServiceGrpc.AdminServiceImplBase {
+    @Autowired
+    RequestHandlerFactory handlerFactory;
+
+    private Payload process(Payload value) {
+        if (value == null || 
StringUtils.isBlank(value.getMetadata().getType())) {
+            return PayloadUtil.from(FailResponse.build(ErrorCode.BAD_REQUEST, 
"bad request: type not " +
+                    "exists"));
+        }
+        try {
+            BaseRequestHandler<BaseRemoteRequest, BaseRemoteResponse> handler =
+                    handlerFactory.getHandler(value.getMetadata().getType());
+            if (handler == null) {
+                return 
PayloadUtil.from(FailResponse.build(BaseRemoteResponse.UNKNOWN,
+                        "not match any request handler"));
+            }
+            BaseRemoteResponse response = 
handler.handlerRequest((BaseRemoteRequest) PayloadUtil.parse(value), 
value.getMetadata());
+            if (response == null || response instanceof EmptyAckResponse) {
+                return null;
+            }
+            return PayloadUtil.from(response);
+        } catch (Exception e) {
+            log.warn("process payload {} fail", value.getMetadata().getType(), 
e);
+            if (e instanceof AdminServerRuntimeException) {
+                return 
PayloadUtil.from(FailResponse.build(((AdminServerRuntimeException)e).getCode(),
+                        e.getMessage()));
+            }
+            return PayloadUtil.from(FailResponse.build(ErrorCode.INTERNAL_ERR, 
"admin server internal err"));
+        }
+    }
+
+    public StreamObserver<Payload> invokeBiStream(StreamObserver<Payload> 
responseObserver) {
+        return new StreamObserver<Payload>() {
+            @Override
+            public void onNext(Payload value) {
+                Payload payload = process(value);
+                if (payload == null) {
+                    return;
+                }
+                responseObserver.onNext(payload);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+                if (responseObserver instanceof ServerCallStreamObserver) {
+                    if (!((ServerCallStreamObserver<Payload>) 
responseObserver).isCancelled()) {
+                        log.warn("admin gRPC server fail", t);
+                    }
+                }
+            }
+
+            @Override
+            public void onCompleted() {
+                responseObserver.onCompleted();
+            }
+        };
+    }
+
+    public void invoke(Payload request, StreamObserver<Payload> 
responseObserver) {
+        responseObserver.onNext(process(request));
+        responseObserver.onCompleted();
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/BaseServer.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/BaseServer.java
new file mode 100644
index 000000000..764be8a9b
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/BaseServer.java
@@ -0,0 +1,30 @@
+package com.apache.eventmesh.admin.server.web;
+
+import com.apache.eventmesh.admin.server.ComponentLifeCycle;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.eventmesh.common.remote.payload.PayloadFactory;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+@Slf4j
+public abstract class BaseServer implements ComponentLifeCycle {
+    static {
+        PayloadFactory.getInstance().init();
+    }
+    @PostConstruct
+    public void init() throws Exception {
+        log.info("[{}] server starting at port [{}]", 
this.getClass().getSimpleName(), getPort());
+        start();
+        log.info("[{}] server started at port [{}]", 
this.getClass().getSimpleName(), getPort());
+    }
+
+    @PreDestroy
+    public void shutdown() {
+        log.info("[{}] server will destroy", this.getClass().getSimpleName());
+        destroy();
+        log.info("[{}] server has be destroy", 
this.getClass().getSimpleName());
+    }
+
+    public abstract int getPort();
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java
index fee889a89..1a1ccb17b 100644
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java
@@ -1,19 +1,53 @@
 package com.apache.eventmesh.admin.server.web;
 
-import com.apache.eventmesh.admin.server.ComponentLifeCycle;
-import 
org.apache.eventmesh.common.protocol.grpc.adminserver.AdminBiStreamServiceGrpc;
+import com.apache.eventmesh.admin.server.AdminServerProperties;
+import io.grpc.Server;
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Controller;
 
+import java.util.concurrent.TimeUnit;
+
 @Controller
-public class GrpcServer extends 
AdminBiStreamServiceGrpc.AdminBiStreamServiceImplBase implements 
ComponentLifeCycle {
+@Slf4j
+public class GrpcServer extends BaseServer {
 
-    @Override
-    public void start() {
+    @Autowired
+    AdminGrpcServer adminGrpcServer;
+
+    @Autowired
+    AdminServerProperties properties;
+
+    private Server server;
 
+    @Override
+    public void start() throws Exception {
+        NettyServerBuilder serverBuilder = 
NettyServerBuilder.forPort(getPort()).addService(adminGrpcServer);
+        if (properties.isEnableSSL()) {
+            serverBuilder.sslContext(null);
+        }
+        server = serverBuilder.build();
+        server.start();
     }
 
     @Override
     public void destroy() {
+        try {
+            if (server != null) {
+                server.shutdown();
+                if(!server.awaitTermination(30, TimeUnit.SECONDS)) {
+                    log.warn("[{}] server don't graceful stop in 30s, it will 
shutdown now", this.getClass().getSimpleName());
+                    server.shutdownNow();
+                }
+            }
+        } catch (InterruptedException e) {
+            log.warn("destroy [{}] server fail", 
this.getClass().getSimpleName(), e);
+        }
+    }
 
+    @Override
+    public int getPort() {
+        return properties.getPort();
     }
 }
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/DBThreadPool.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/DBThreadPool.java
new file mode 100644
index 000000000..e212dd2c5
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/DBThreadPool.java
@@ -0,0 +1,38 @@
+package com.apache.eventmesh.admin.server.web.db;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+@Component
+@Slf4j
+public class DBThreadPool {
+    private final ThreadPoolExecutor executor =
+            new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() 
* 2,
+                    Runtime.getRuntime().availableProcessors() * 2, 0L, 
TimeUnit.SECONDS,
+                    new LinkedBlockingQueue<>(1000), new 
EventMeshThreadFactory("admin-server-db"),
+                    new ThreadPoolExecutor.DiscardOldestPolicy());
+    @PreDestroy
+    private void destroy() {
+        if (!executor.isShutdown()) {
+            try {
+                executor.shutdown();
+                if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+                    log.info("wait heart beat handler thread pool shutdown 
timeout, it will shutdown immediately");
+                    executor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                log.warn("wait heart beat handler thread pool shutdown fail");
+            }
+        }
+    }
+
+    public ThreadPoolExecutor getExecutors() {
+        return executor;
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java
new file mode 100644
index 000000000..9df705048
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshDataSource.java
@@ -0,0 +1,35 @@
+package com.apache.eventmesh.admin.server.web.db.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * @TableName event_mesh_data_source
+ */
+@TableName(value ="event_mesh_data_source")
+@Data
+public class EventMeshDataSource implements Serializable {
+    @TableId(type = IdType.AUTO)
+    private Integer id;
+
+    private Integer dataType;
+
+    private String description;
+
+    private String configuration;
+
+    private Integer createUid;
+
+    private Integer updateUid;
+
+    private Date createTime;
+
+    private Date updateTime;
+
+    private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java
new file mode 100644
index 000000000..80174a239
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java
@@ -0,0 +1,29 @@
+package com.apache.eventmesh.admin.server.web.db.entity;
+
+import lombok.Data;
+import org.apache.eventmesh.common.remote.JobState;
+import org.apache.eventmesh.common.remote.job.JobTransportType;
+import org.apache.eventmesh.common.remote.offset.RecordPosition;
+
+import java.util.Map;
+
+@Data
+public class EventMeshJobDetail {
+    private Integer id;
+
+    private String name;
+
+    private JobTransportType transportType;
+
+    private Map<String, Object> sourceConnectorConfig;
+
+    private String sourceConnectorDesc;
+
+    private Map<String, Object> sinkConnectorConfig;
+
+    private String sinkConnectorDesc;
+
+    private RecordPosition position;
+
+    private JobState state;
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java
new file mode 100644
index 000000000..efb3c95d0
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshJobInfo.java
@@ -0,0 +1,41 @@
+package com.apache.eventmesh.admin.server.web.db.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * @TableName event_mesh_job_info
+ */
+@TableName(value ="event_mesh_job_info")
+@Data
+public class EventMeshJobInfo implements Serializable {
+    @TableId(type = IdType.AUTO)
+    private Integer jobID;
+
+    private String name;
+
+    private Integer transportType;
+
+    private Integer sourceData;
+
+    private Integer targetData;
+
+    private Integer state;
+
+    private Integer jobType;
+
+    private Integer createUid;
+
+    private Integer updateUid;
+
+    private Date createTime;
+
+    private Date updateTime;
+
+    private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java
new file mode 100644
index 000000000..75a7c245e
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshMysqlPosition.java
@@ -0,0 +1,35 @@
+package com.apache.eventmesh.admin.server.web.db.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * @TableName event_mesh_mysql_position
+ */
+@TableName(value ="event_mesh_mysql_position")
+@Data
+public class EventMeshMysqlPosition implements Serializable {
+    @TableId(type = IdType.AUTO)
+    private Integer id;
+
+    private Integer jobID;
+
+    private String address;
+
+    private Long position;
+
+    private Long timestamp;
+
+    private String journalName;
+
+    private Date createTime;
+
+    private Date updateTime;
+
+    private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshPositionReporterHistory.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshPositionReporterHistory.java
new file mode 100644
index 000000000..b1c51e7b4
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshPositionReporterHistory.java
@@ -0,0 +1,29 @@
+package com.apache.eventmesh.admin.server.web.db.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * @TableName event_mesh_position_reporter_history
+ */
+@TableName(value ="event_mesh_position_reporter_history")
+@Data
+public class EventMeshPositionReporterHistory implements Serializable {
+    @TableId(type = IdType.AUTO)
+    private Long id;
+
+    private Integer job;
+
+    private String record;
+
+    private String address;
+
+    private Date createTime;
+
+    private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHeartbeat.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHeartbeat.java
new file mode 100644
index 000000000..298a50173
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHeartbeat.java
@@ -0,0 +1,33 @@
+package com.apache.eventmesh.admin.server.web.db.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * @TableName event_mesh_runtime_heartbeat
+ */
+@TableName(value ="event_mesh_runtime_heartbeat")
+@Data
+public class EventMeshRuntimeHeartbeat implements Serializable {
+    @TableId(type = IdType.AUTO)
+    private Long id;
+
+    private String adminAddr;
+
+    private String runtimeAddr;
+
+    private Integer jobID;
+
+    private String reportTime;
+
+    private Date updateTime;
+
+    private Date createTime;
+
+    private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHistory.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHistory.java
new file mode 100644
index 000000000..7d3e3f463
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/entity/EventMeshRuntimeHistory.java
@@ -0,0 +1,27 @@
+package com.apache.eventmesh.admin.server.web.db.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * @TableName event_mesh_runtime_history
+ */
+@TableName(value ="event_mesh_runtime_history")
+@Data
+public class EventMeshRuntimeHistory implements Serializable {
+    @TableId(type = IdType.AUTO)
+    private Long id;
+
+    private Integer job;
+
+    private String address;
+
+    private Date createTime;
+
+    private static final long serialVersionUID = 1L;
+}
\ No newline at end of file
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshDataSourceMapper.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshDataSourceMapper.java
new file mode 100644
index 000000000..805d3642a
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshDataSourceMapper.java
@@ -0,0 +1,20 @@
+package com.apache.eventmesh.admin.server.web.db.mapper;
+
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+* @author sodafang
+* @description 针对表【event_mesh_data_source】的数据库操作Mapper
+* @createDate 2024-05-09 15:52:49
+* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource
+*/
+@Mapper
+public interface EventMeshDataSourceMapper extends 
BaseMapper<EventMeshDataSource> {
+
+}
+
+
+
+
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java
new file mode 100644
index 000000000..0082a9f2e
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshJobInfoMapper.java
@@ -0,0 +1,20 @@
+package com.apache.eventmesh.admin.server.web.db.mapper;
+
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+* @author sodafang
+* @description 针对表【event_mesh_job_info】的数据库操作Mapper
+* @createDate 2024-05-09 15:51:45
+* @Entity com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo
+*/
+@Mapper
+public interface EventMeshJobInfoMapper extends BaseMapper<EventMeshJobInfo> {
+
+}
+
+
+
+
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshMysqlPositionMapper.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshMysqlPositionMapper.java
new file mode 100644
index 000000000..1b29508c1
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshMysqlPositionMapper.java
@@ -0,0 +1,20 @@
+package com.apache.eventmesh.admin.server.web.db.mapper;
+
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+* @author sodafang
+* @description 针对表【event_mesh_mysql_position】的数据库操作Mapper
+* @createDate 2024-05-14 17:15:03
+* @Entity 
com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition
+*/
+@Mapper
+public interface EventMeshMysqlPositionMapper extends 
BaseMapper<EventMeshMysqlPosition> {
+
+}
+
+
+
+
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshPositionReporterHistoryMapper.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshPositionReporterHistoryMapper.java
new file mode 100644
index 000000000..2cdc59f71
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshPositionReporterHistoryMapper.java
@@ -0,0 +1,20 @@
+package com.apache.eventmesh.admin.server.web.db.mapper;
+
+import 
com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+* @author sodafang
+* @description 
针对表【event_mesh_position_reporter_history(记录position上报者变更时,老记录)】的数据库操作Mapper
+* @createDate 2024-05-14 17:15:03
+* @Entity 
com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory
+*/
+@Mapper
+public interface EventMeshPositionReporterHistoryMapper extends 
BaseMapper<EventMeshPositionReporterHistory> {
+
+}
+
+
+
+
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHeartbeatMapper.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHeartbeatMapper.java
new file mode 100644
index 000000000..a23ef0422
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHeartbeatMapper.java
@@ -0,0 +1,20 @@
+package com.apache.eventmesh.admin.server.web.db.mapper;
+
+import 
com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+* @author sodafang
+* @description 针对表【event_mesh_runtime_heartbeat】的数据库操作Mapper
+* @createDate 2024-05-14 17:15:03
+* @Entity 
com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat
+*/
+@Mapper
+public interface EventMeshRuntimeHeartbeatMapper extends 
BaseMapper<EventMeshRuntimeHeartbeat> {
+
+}
+
+
+
+
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHistoryMapper.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHistoryMapper.java
new file mode 100644
index 000000000..85fa51aec
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/mapper/EventMeshRuntimeHistoryMapper.java
@@ -0,0 +1,20 @@
+package com.apache.eventmesh.admin.server.web.db.mapper;
+
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+* @author sodafang
+* @description 针对表【event_mesh_runtime_history(记录runtime上运行任务的变更)】的数据库操作Mapper
+* @createDate 2024-05-14 17:15:03
+* @Entity 
com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory
+*/
+@Mapper
+public interface EventMeshRuntimeHistoryMapper extends 
BaseMapper<EventMeshRuntimeHistory> {
+
+}
+
+
+
+
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshDataSourceService.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshDataSourceService.java
new file mode 100644
index 000000000..6b1e2d8f7
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshDataSourceService.java
@@ -0,0 +1,13 @@
+package com.apache.eventmesh.admin.server.web.db.service;
+
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/**
+* @author sodafang
+* @description 针对表【event_mesh_data_source】的数据库操作Service
+* @createDate 2024-05-09 15:52:49
+*/
+public interface EventMeshDataSourceService extends 
IService<EventMeshDataSource> {
+
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java
new file mode 100644
index 000000000..c9a1e972a
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshJobInfoService.java
@@ -0,0 +1,12 @@
+package com.apache.eventmesh.admin.server.web.db.service;
+
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/**
+* @author sodafang
+* @description 针对表【event_mesh_job_info】的数据库操作Service
+* @createDate 2024-05-09 15:51:45
+*/
+public interface EventMeshJobInfoService extends IService<EventMeshJobInfo> {
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionService.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionService.java
new file mode 100644
index 000000000..83597b047
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshMysqlPositionService.java
@@ -0,0 +1,12 @@
+package com.apache.eventmesh.admin.server.web.db.service;
+
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/**
+* @author sodafang
+* @description 针对表【event_mesh_mysql_position】的数据库操作Service
+* @createDate 2024-05-14 17:15:03
+*/
+public interface EventMeshMysqlPositionService extends 
IService<EventMeshMysqlPosition> {
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshPositionReporterHistoryService.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshPositionReporterHistoryService.java
new file mode 100644
index 000000000..adaac4395
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshPositionReporterHistoryService.java
@@ -0,0 +1,13 @@
+package com.apache.eventmesh.admin.server.web.db.service;
+
+import 
com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/**
+* @author sodafang
+* @description 
针对表【event_mesh_position_reporter_history(记录position上报者变更时,老记录)】的数据库操作Service
+* @createDate 2024-05-14 17:15:03
+*/
+public interface EventMeshPositionReporterHistoryService extends 
IService<EventMeshPositionReporterHistory> {
+
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatService.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatService.java
new file mode 100644
index 000000000..3b9b8465c
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHeartbeatService.java
@@ -0,0 +1,12 @@
+package com.apache.eventmesh.admin.server.web.db.service;
+
+import 
com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/**
+* @author sodafang
+* @description 针对表【event_mesh_runtime_heartbeat】的数据库操作Service
+* @createDate 2024-05-14 17:15:03
+*/
+public interface EventMeshRuntimeHeartbeatService extends 
IService<EventMeshRuntimeHeartbeat> {
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHistoryService.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHistoryService.java
new file mode 100644
index 000000000..0da277f26
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/EventMeshRuntimeHistoryService.java
@@ -0,0 +1,13 @@
+package com.apache.eventmesh.admin.server.web.db.service;
+
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/**
+* @author sodafang
+* @description 针对表【event_mesh_runtime_history(记录runtime上运行任务的变更)】的数据库操作Service
+* @createDate 2024-05-14 17:15:03
+*/
+public interface EventMeshRuntimeHistoryService extends 
IService<EventMeshRuntimeHistory> {
+
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshDataSourceServiceImpl.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshDataSourceServiceImpl.java
new file mode 100644
index 000000000..0d9f4f296
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshDataSourceServiceImpl.java
@@ -0,0 +1,22 @@
+package com.apache.eventmesh.admin.server.web.db.service.impl;
+
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource;
+import 
com.apache.eventmesh.admin.server.web.db.mapper.EventMeshDataSourceMapper;
+import 
com.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.springframework.stereotype.Service;
+
+/**
+* @author sodafang
+* @description 针对表【event_mesh_data_source】的数据库操作Service实现
+* @createDate 2024-05-09 15:52:49
+*/
+@Service
+public class EventMeshDataSourceServiceImpl extends 
ServiceImpl<EventMeshDataSourceMapper, EventMeshDataSource>
+    implements EventMeshDataSourceService{
+
+}
+
+
+
+
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java
new file mode 100644
index 000000000..eb2a6bf60
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshJobInfoServiceImpl.java
@@ -0,0 +1,23 @@
+package com.apache.eventmesh.admin.server.web.db.service.impl;
+
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
+import com.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper;
+import 
com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+/**
+* @author sodafang
+* @description 针对表【event_mesh_job_info】的数据库操作Service实现
+* @createDate 2024-05-09 15:51:45
+*/
+@Service
+@Slf4j
+public class EventMeshJobInfoServiceImpl extends 
ServiceImpl<EventMeshJobInfoMapper, EventMeshJobInfo>
+    implements EventMeshJobInfoService{
+}
+
+
+
+
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java
new file mode 100644
index 000000000..a3bfa4770
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshMysqlPositionServiceImpl.java
@@ -0,0 +1,23 @@
+package com.apache.eventmesh.admin.server.web.db.service.impl;
+
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition;
+import 
com.apache.eventmesh.admin.server.web.db.mapper.EventMeshMysqlPositionMapper;
+import 
com.apache.eventmesh.admin.server.web.db.service.EventMeshMysqlPositionService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+/**
+* @author sodafang
+* @description 针对表【event_mesh_mysql_position】的数据库操作Service实现
+* @createDate 2024-05-14 17:15:03
+*/
+@Service
+@Slf4j
+public class EventMeshMysqlPositionServiceImpl extends 
ServiceImpl<EventMeshMysqlPositionMapper, EventMeshMysqlPosition>
+    implements EventMeshMysqlPositionService{
+}
+
+
+
+
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshPositionReporterHistoryServiceImpl.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshPositionReporterHistoryServiceImpl.java
new file mode 100644
index 000000000..071d44f66
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshPositionReporterHistoryServiceImpl.java
@@ -0,0 +1,22 @@
+package com.apache.eventmesh.admin.server.web.db.service.impl;
+
+import 
com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory;
+import 
com.apache.eventmesh.admin.server.web.db.mapper.EventMeshPositionReporterHistoryMapper;
+import 
com.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.springframework.stereotype.Service;
+
+/**
+* @author sodafang
+* @description 
针对表【event_mesh_position_reporter_history(记录position上报者变更时,老记录)】的数据库操作Service实现
+* @createDate 2024-05-14 17:15:03
+*/
+@Service
+public class EventMeshPositionReporterHistoryServiceImpl extends 
ServiceImpl<EventMeshPositionReporterHistoryMapper, 
EventMeshPositionReporterHistory>
+    implements EventMeshPositionReporterHistoryService{
+
+}
+
+
+
+
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java
new file mode 100644
index 000000000..824bd3aec
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHeartbeatServiceImpl.java
@@ -0,0 +1,23 @@
+package com.apache.eventmesh.admin.server.web.db.service.impl;
+
+import 
com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat;
+import 
com.apache.eventmesh.admin.server.web.db.mapper.EventMeshRuntimeHeartbeatMapper;
+import 
com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+/**
+* @author sodafang
+* @description 针对表【event_mesh_runtime_heartbeat】的数据库操作Service实现
+* @createDate 2024-05-14 17:15:03
+*/
+@Service
+@Slf4j
+public class EventMeshRuntimeHeartbeatServiceImpl extends 
ServiceImpl<EventMeshRuntimeHeartbeatMapper, EventMeshRuntimeHeartbeat>
+    implements EventMeshRuntimeHeartbeatService{
+}
+
+
+
+
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHistoryServiceImpl.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHistoryServiceImpl.java
new file mode 100644
index 000000000..c3883e55b
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/db/service/impl/EventMeshRuntimeHistoryServiceImpl.java
@@ -0,0 +1,22 @@
+package com.apache.eventmesh.admin.server.web.db.service.impl;
+
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory;
+import 
com.apache.eventmesh.admin.server.web.db.mapper.EventMeshRuntimeHistoryMapper;
+import 
com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHistoryService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.springframework.stereotype.Service;
+
+/**
+* @author sodafang
+* @description 
针对表【event_mesh_runtime_history(记录runtime上运行任务的变更)】的数据库操作Service实现
+* @createDate 2024-05-14 17:15:03
+*/
+@Service
+public class EventMeshRuntimeHistoryServiceImpl extends 
ServiceImpl<EventMeshRuntimeHistoryMapper, EventMeshRuntimeHistory>
+    implements EventMeshRuntimeHistoryService{
+
+}
+
+
+
+
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/generated/AdminBiStreamServiceGrpc.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/generated/AdminBiStreamServiceGrpc.java
deleted file mode 100644
index 2a10de9ac..000000000
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/generated/AdminBiStreamServiceGrpc.java
+++ /dev/null
@@ -1,263 +0,0 @@
-package com.apache.eventmesh.admin.server.web.generated;
-
-import org.apache.eventmesh.common.grpc.EventMeshAdminService;
-import org.apache.eventmesh.common.grpc.Payload;
-
-import static io.grpc.MethodDescriptor.generateFullMethodName;
-
-/**
- */
[email protected](
-    value = "by gRPC proto compiler (version 1.40.0)",
-    comments = "Source: event_mesh_admin_service.proto")
[email protected]
-public final class AdminBiStreamServiceGrpc {
-
-  private AdminBiStreamServiceGrpc() {}
-
-  public static final String SERVICE_NAME = "AdminBiStreamService";
-
-  // Static method descriptors that strictly reflect the proto.
-  private static volatile io.grpc.MethodDescriptor<Payload,
-      Payload> getInvokeBiStreamMethod;
-
-  @io.grpc.stub.annotations.RpcMethod(
-      fullMethodName = SERVICE_NAME + '/' + "invokeBiStream",
-      requestType = Payload.class,
-      responseType = Payload.class,
-      methodType = io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING)
-  public static io.grpc.MethodDescriptor<Payload,
-      Payload> getInvokeBiStreamMethod() {
-    io.grpc.MethodDescriptor<Payload, Payload> getInvokeBiStreamMethod;
-    if ((getInvokeBiStreamMethod = 
AdminBiStreamServiceGrpc.getInvokeBiStreamMethod) == null) {
-      synchronized (AdminBiStreamServiceGrpc.class) {
-        if ((getInvokeBiStreamMethod = 
AdminBiStreamServiceGrpc.getInvokeBiStreamMethod) == null) {
-          AdminBiStreamServiceGrpc.getInvokeBiStreamMethod = 
getInvokeBiStreamMethod =
-              io.grpc.MethodDescriptor.<Payload, Payload>newBuilder()
-              .setType(io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING)
-              .setFullMethodName(generateFullMethodName(SERVICE_NAME, 
"invokeBiStream"))
-              .setSampledToLocalTracing(true)
-              .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
-                  Payload.getDefaultInstance()))
-              .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
-                  Payload.getDefaultInstance()))
-              .setSchemaDescriptor(new 
AdminBiStreamServiceMethodDescriptorSupplier("invokeBiStream"))
-              .build();
-        }
-      }
-    }
-    return getInvokeBiStreamMethod;
-  }
-
-  /**
-   * Creates a new async stub that supports all call types for the service
-   */
-  public static AdminBiStreamServiceStub newStub(io.grpc.Channel channel) {
-    io.grpc.stub.AbstractStub.StubFactory<AdminBiStreamServiceStub> factory =
-      new io.grpc.stub.AbstractStub.StubFactory<AdminBiStreamServiceStub>() {
-        @Override
-        public AdminBiStreamServiceStub newStub(io.grpc.Channel channel, 
io.grpc.CallOptions callOptions) {
-          return new AdminBiStreamServiceStub(channel, callOptions);
-        }
-      };
-    return AdminBiStreamServiceStub.newStub(factory, channel);
-  }
-
-  /**
-   * Creates a new blocking-style stub that supports unary and streaming 
output calls on the service
-   */
-  public static AdminBiStreamServiceBlockingStub newBlockingStub(
-      io.grpc.Channel channel) {
-    io.grpc.stub.AbstractStub.StubFactory<AdminBiStreamServiceBlockingStub> 
factory =
-      new 
io.grpc.stub.AbstractStub.StubFactory<AdminBiStreamServiceBlockingStub>() {
-        @Override
-        public AdminBiStreamServiceBlockingStub newStub(io.grpc.Channel 
channel, io.grpc.CallOptions callOptions) {
-          return new AdminBiStreamServiceBlockingStub(channel, callOptions);
-        }
-      };
-    return AdminBiStreamServiceBlockingStub.newStub(factory, channel);
-  }
-
-  /**
-   * Creates a new ListenableFuture-style stub that supports unary calls on 
the service
-   */
-  public static AdminBiStreamServiceFutureStub newFutureStub(
-      io.grpc.Channel channel) {
-    io.grpc.stub.AbstractStub.StubFactory<AdminBiStreamServiceFutureStub> 
factory =
-      new 
io.grpc.stub.AbstractStub.StubFactory<AdminBiStreamServiceFutureStub>() {
-        @Override
-        public AdminBiStreamServiceFutureStub newStub(io.grpc.Channel channel, 
io.grpc.CallOptions callOptions) {
-          return new AdminBiStreamServiceFutureStub(channel, callOptions);
-        }
-      };
-    return AdminBiStreamServiceFutureStub.newStub(factory, channel);
-  }
-
-  /**
-   */
-  public static abstract class AdminBiStreamServiceImplBase implements 
io.grpc.BindableService {
-
-    /**
-     */
-    public io.grpc.stub.StreamObserver<Payload> invokeBiStream(
-        io.grpc.stub.StreamObserver<Payload> responseObserver) {
-      return 
io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall(getInvokeBiStreamMethod(),
 responseObserver);
-    }
-
-    @Override public final io.grpc.ServerServiceDefinition bindService() {
-      return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
-          .addMethod(
-            getInvokeBiStreamMethod(),
-            io.grpc.stub.ServerCalls.asyncBidiStreamingCall(
-              new MethodHandlers<
-                Payload,
-                Payload>(
-                  this, METHODID_INVOKE_BI_STREAM)))
-          .build();
-    }
-  }
-
-  /**
-   */
-  public static final class AdminBiStreamServiceStub extends 
io.grpc.stub.AbstractAsyncStub<AdminBiStreamServiceStub> {
-    private AdminBiStreamServiceStub(
-        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
-      super(channel, callOptions);
-    }
-
-    @Override
-    protected AdminBiStreamServiceStub build(
-        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
-      return new AdminBiStreamServiceStub(channel, callOptions);
-    }
-
-    /**
-     */
-    public io.grpc.stub.StreamObserver<Payload> invokeBiStream(
-        io.grpc.stub.StreamObserver<Payload> responseObserver) {
-      return io.grpc.stub.ClientCalls.asyncBidiStreamingCall(
-          getChannel().newCall(getInvokeBiStreamMethod(), getCallOptions()), 
responseObserver);
-    }
-  }
-
-  /**
-   */
-  public static final class AdminBiStreamServiceBlockingStub extends 
io.grpc.stub.AbstractBlockingStub<AdminBiStreamServiceBlockingStub> {
-    private AdminBiStreamServiceBlockingStub(
-        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
-      super(channel, callOptions);
-    }
-
-    @Override
-    protected AdminBiStreamServiceBlockingStub build(
-        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
-      return new AdminBiStreamServiceBlockingStub(channel, callOptions);
-    }
-  }
-
-  /**
-   */
-  public static final class AdminBiStreamServiceFutureStub extends 
io.grpc.stub.AbstractFutureStub<AdminBiStreamServiceFutureStub> {
-    private AdminBiStreamServiceFutureStub(
-        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
-      super(channel, callOptions);
-    }
-
-    @Override
-    protected AdminBiStreamServiceFutureStub build(
-        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
-      return new AdminBiStreamServiceFutureStub(channel, callOptions);
-    }
-  }
-
-  private static final int METHODID_INVOKE_BI_STREAM = 0;
-
-  private static final class MethodHandlers<Req, Resp> implements
-      io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
-      io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
-      io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
-      io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
-    private final AdminBiStreamServiceImplBase serviceImpl;
-    private final int methodId;
-
-    MethodHandlers(AdminBiStreamServiceImplBase serviceImpl, int methodId) {
-      this.serviceImpl = serviceImpl;
-      this.methodId = methodId;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> 
responseObserver) {
-      switch (methodId) {
-        default:
-          throw new AssertionError();
-      }
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public io.grpc.stub.StreamObserver<Req> invoke(
-        io.grpc.stub.StreamObserver<Resp> responseObserver) {
-      switch (methodId) {
-        case METHODID_INVOKE_BI_STREAM:
-          return (io.grpc.stub.StreamObserver<Req>) serviceImpl.invokeBiStream(
-              (io.grpc.stub.StreamObserver<Payload>) responseObserver);
-        default:
-          throw new AssertionError();
-      }
-    }
-  }
-
-  private static abstract class AdminBiStreamServiceBaseDescriptorSupplier
-      implements io.grpc.protobuf.ProtoFileDescriptorSupplier, 
io.grpc.protobuf.ProtoServiceDescriptorSupplier {
-    AdminBiStreamServiceBaseDescriptorSupplier() {}
-
-    @Override
-    public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
-      return EventMeshAdminService.getDescriptor();
-    }
-
-    @Override
-    public com.google.protobuf.Descriptors.ServiceDescriptor 
getServiceDescriptor() {
-      return getFileDescriptor().findServiceByName("AdminBiStreamService");
-    }
-  }
-
-  private static final class AdminBiStreamServiceFileDescriptorSupplier
-      extends AdminBiStreamServiceBaseDescriptorSupplier {
-    AdminBiStreamServiceFileDescriptorSupplier() {}
-  }
-
-  private static final class AdminBiStreamServiceMethodDescriptorSupplier
-      extends AdminBiStreamServiceBaseDescriptorSupplier
-      implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
-    private final String methodName;
-
-    AdminBiStreamServiceMethodDescriptorSupplier(String methodName) {
-      this.methodName = methodName;
-    }
-
-    @Override
-    public com.google.protobuf.Descriptors.MethodDescriptor 
getMethodDescriptor() {
-      return getServiceDescriptor().findMethodByName(methodName);
-    }
-  }
-
-  private static volatile io.grpc.ServiceDescriptor serviceDescriptor;
-
-  public static io.grpc.ServiceDescriptor getServiceDescriptor() {
-    io.grpc.ServiceDescriptor result = serviceDescriptor;
-    if (result == null) {
-      synchronized (AdminBiStreamServiceGrpc.class) {
-        result = serviceDescriptor;
-        if (result == null) {
-          serviceDescriptor = result = 
io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
-              .setSchemaDescriptor(new 
AdminBiStreamServiceFileDescriptorSupplier())
-              .addMethod(getInvokeBiStreamMethod())
-              .build();
-        }
-      }
-    }
-    return result;
-  }
-}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/generated/AdminServiceGrpc.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/generated/AdminServiceGrpc.java
deleted file mode 100644
index 61b418e90..000000000
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/generated/AdminServiceGrpc.java
+++ /dev/null
@@ -1,279 +0,0 @@
-package com.apache.eventmesh.admin.server.web.generated;
-
-import org.apache.eventmesh.common.grpc.EventMeshAdminService;
-import org.apache.eventmesh.common.grpc.Payload;
-
-import static io.grpc.MethodDescriptor.generateFullMethodName;
-
-/**
- */
[email protected](
-    value = "by gRPC proto compiler (version 1.40.0)",
-    comments = "Source: event_mesh_admin_service.proto")
[email protected]
-public final class AdminServiceGrpc {
-
-  private AdminServiceGrpc() {}
-
-  public static final String SERVICE_NAME = "AdminService";
-
-  // Static method descriptors that strictly reflect the proto.
-  private static volatile io.grpc.MethodDescriptor<Payload,
-      Payload> getInvokeMethod;
-
-  @io.grpc.stub.annotations.RpcMethod(
-      fullMethodName = SERVICE_NAME + '/' + "invoke",
-      requestType = Payload.class,
-      responseType = Payload.class,
-      methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
-  public static io.grpc.MethodDescriptor<Payload,
-      Payload> getInvokeMethod() {
-    io.grpc.MethodDescriptor<Payload, Payload> getInvokeMethod;
-    if ((getInvokeMethod = AdminServiceGrpc.getInvokeMethod) == null) {
-      synchronized (AdminServiceGrpc.class) {
-        if ((getInvokeMethod = AdminServiceGrpc.getInvokeMethod) == null) {
-          AdminServiceGrpc.getInvokeMethod = getInvokeMethod =
-              io.grpc.MethodDescriptor.<Payload, Payload>newBuilder()
-              .setType(io.grpc.MethodDescriptor.MethodType.UNARY)
-              .setFullMethodName(generateFullMethodName(SERVICE_NAME, 
"invoke"))
-              .setSampledToLocalTracing(true)
-              .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
-                  Payload.getDefaultInstance()))
-              .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
-                  Payload.getDefaultInstance()))
-              .setSchemaDescriptor(new 
AdminServiceMethodDescriptorSupplier("invoke"))
-              .build();
-        }
-      }
-    }
-    return getInvokeMethod;
-  }
-
-  /**
-   * Creates a new async stub that supports all call types for the service
-   */
-  public static AdminServiceStub newStub(io.grpc.Channel channel) {
-    io.grpc.stub.AbstractStub.StubFactory<AdminServiceStub> factory =
-      new io.grpc.stub.AbstractStub.StubFactory<AdminServiceStub>() {
-        @Override
-        public AdminServiceStub newStub(io.grpc.Channel channel, 
io.grpc.CallOptions callOptions) {
-          return new AdminServiceStub(channel, callOptions);
-        }
-      };
-    return AdminServiceStub.newStub(factory, channel);
-  }
-
-  /**
-   * Creates a new blocking-style stub that supports unary and streaming 
output calls on the service
-   */
-  public static AdminServiceBlockingStub newBlockingStub(
-      io.grpc.Channel channel) {
-    io.grpc.stub.AbstractStub.StubFactory<AdminServiceBlockingStub> factory =
-      new io.grpc.stub.AbstractStub.StubFactory<AdminServiceBlockingStub>() {
-        @Override
-        public AdminServiceBlockingStub newStub(io.grpc.Channel channel, 
io.grpc.CallOptions callOptions) {
-          return new AdminServiceBlockingStub(channel, callOptions);
-        }
-      };
-    return AdminServiceBlockingStub.newStub(factory, channel);
-  }
-
-  /**
-   * Creates a new ListenableFuture-style stub that supports unary calls on 
the service
-   */
-  public static AdminServiceFutureStub newFutureStub(
-      io.grpc.Channel channel) {
-    io.grpc.stub.AbstractStub.StubFactory<AdminServiceFutureStub> factory =
-      new io.grpc.stub.AbstractStub.StubFactory<AdminServiceFutureStub>() {
-        @Override
-        public AdminServiceFutureStub newStub(io.grpc.Channel channel, 
io.grpc.CallOptions callOptions) {
-          return new AdminServiceFutureStub(channel, callOptions);
-        }
-      };
-    return AdminServiceFutureStub.newStub(factory, channel);
-  }
-
-  /**
-   */
-  public static abstract class AdminServiceImplBase implements 
io.grpc.BindableService {
-
-    /**
-     */
-    public void invoke(Payload request,
-        io.grpc.stub.StreamObserver<Payload> responseObserver) {
-      io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getInvokeMethod(), 
responseObserver);
-    }
-
-    @Override public final io.grpc.ServerServiceDefinition bindService() {
-      return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
-          .addMethod(
-            getInvokeMethod(),
-            io.grpc.stub.ServerCalls.asyncUnaryCall(
-              new MethodHandlers<
-                Payload,
-                Payload>(
-                  this, METHODID_INVOKE)))
-          .build();
-    }
-  }
-
-  /**
-   */
-  public static final class AdminServiceStub extends 
io.grpc.stub.AbstractAsyncStub<AdminServiceStub> {
-    private AdminServiceStub(
-        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
-      super(channel, callOptions);
-    }
-
-    @Override
-    protected AdminServiceStub build(
-        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
-      return new AdminServiceStub(channel, callOptions);
-    }
-
-    /**
-     */
-    public void invoke(Payload request,
-        io.grpc.stub.StreamObserver<Payload> responseObserver) {
-      io.grpc.stub.ClientCalls.asyncUnaryCall(
-          getChannel().newCall(getInvokeMethod(), getCallOptions()), request, 
responseObserver);
-    }
-  }
-
-  /**
-   */
-  public static final class AdminServiceBlockingStub extends 
io.grpc.stub.AbstractBlockingStub<AdminServiceBlockingStub> {
-    private AdminServiceBlockingStub(
-        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
-      super(channel, callOptions);
-    }
-
-    @Override
-    protected AdminServiceBlockingStub build(
-        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
-      return new AdminServiceBlockingStub(channel, callOptions);
-    }
-
-    /**
-     */
-    public Payload invoke(Payload request) {
-      return io.grpc.stub.ClientCalls.blockingUnaryCall(
-          getChannel(), getInvokeMethod(), getCallOptions(), request);
-    }
-  }
-
-  /**
-   */
-  public static final class AdminServiceFutureStub extends 
io.grpc.stub.AbstractFutureStub<AdminServiceFutureStub> {
-    private AdminServiceFutureStub(
-        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
-      super(channel, callOptions);
-    }
-
-    @Override
-    protected AdminServiceFutureStub build(
-        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
-      return new AdminServiceFutureStub(channel, callOptions);
-    }
-
-    /**
-     */
-    public com.google.common.util.concurrent.ListenableFuture<Payload> invoke(
-        Payload request) {
-      return io.grpc.stub.ClientCalls.futureUnaryCall(
-          getChannel().newCall(getInvokeMethod(), getCallOptions()), request);
-    }
-  }
-
-  private static final int METHODID_INVOKE = 0;
-
-  private static final class MethodHandlers<Req, Resp> implements
-      io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
-      io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
-      io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
-      io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
-    private final AdminServiceImplBase serviceImpl;
-    private final int methodId;
-
-    MethodHandlers(AdminServiceImplBase serviceImpl, int methodId) {
-      this.serviceImpl = serviceImpl;
-      this.methodId = methodId;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> 
responseObserver) {
-      switch (methodId) {
-        case METHODID_INVOKE:
-          serviceImpl.invoke((Payload) request,
-              (io.grpc.stub.StreamObserver<Payload>) responseObserver);
-          break;
-        default:
-          throw new AssertionError();
-      }
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public io.grpc.stub.StreamObserver<Req> invoke(
-        io.grpc.stub.StreamObserver<Resp> responseObserver) {
-      switch (methodId) {
-        default:
-          throw new AssertionError();
-      }
-    }
-  }
-
-  private static abstract class AdminServiceBaseDescriptorSupplier
-      implements io.grpc.protobuf.ProtoFileDescriptorSupplier, 
io.grpc.protobuf.ProtoServiceDescriptorSupplier {
-    AdminServiceBaseDescriptorSupplier() {}
-
-    @Override
-    public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
-      return EventMeshAdminService.getDescriptor();
-    }
-
-    @Override
-    public com.google.protobuf.Descriptors.ServiceDescriptor 
getServiceDescriptor() {
-      return getFileDescriptor().findServiceByName("AdminService");
-    }
-  }
-
-  private static final class AdminServiceFileDescriptorSupplier
-      extends AdminServiceBaseDescriptorSupplier {
-    AdminServiceFileDescriptorSupplier() {}
-  }
-
-  private static final class AdminServiceMethodDescriptorSupplier
-      extends AdminServiceBaseDescriptorSupplier
-      implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
-    private final String methodName;
-
-    AdminServiceMethodDescriptorSupplier(String methodName) {
-      this.methodName = methodName;
-    }
-
-    @Override
-    public com.google.protobuf.Descriptors.MethodDescriptor 
getMethodDescriptor() {
-      return getServiceDescriptor().findMethodByName(methodName);
-    }
-  }
-
-  private static volatile io.grpc.ServiceDescriptor serviceDescriptor;
-
-  public static io.grpc.ServiceDescriptor getServiceDescriptor() {
-    io.grpc.ServiceDescriptor result = serviceDescriptor;
-    if (result == null) {
-      synchronized (AdminServiceGrpc.class) {
-        result = serviceDescriptor;
-        if (result == null) {
-          serviceDescriptor = result = 
io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
-              .setSchemaDescriptor(new AdminServiceFileDescriptorSupplier())
-              .addMethod(getInvokeMethod())
-              .build();
-        }
-      }
-    }
-    return result;
-  }
-}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/BaseRequestHandler.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/BaseRequestHandler.java
new file mode 100644
index 000000000..b1c9518c7
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/BaseRequestHandler.java
@@ -0,0 +1,13 @@
+package com.apache.eventmesh.admin.server.web.handler;
+
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.remote.request.BaseRemoteRequest;
+import org.apache.eventmesh.common.remote.response.BaseRemoteResponse;
+
+public abstract class BaseRequestHandler<T extends BaseRemoteRequest, S 
extends BaseRemoteResponse> {
+    public BaseRemoteResponse handlerRequest(T request, Metadata metadata) {
+        return handler(request, metadata);
+    }
+
+    protected abstract S handler(T request, Metadata metadata);
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/RequestHandlerFactory.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/RequestHandlerFactory.java
new file mode 100644
index 000000000..33535e7f3
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/RequestHandlerFactory.java
@@ -0,0 +1,47 @@
+package com.apache.eventmesh.admin.server.web.handler;
+
+import org.apache.eventmesh.common.remote.request.BaseRemoteRequest;
+import org.apache.eventmesh.common.remote.response.BaseRemoteResponse;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.stereotype.Component;
+
+import java.lang.reflect.ParameterizedType;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Component
+public class RequestHandlerFactory implements 
ApplicationListener<ContextRefreshedEvent> {
+
+    private final Map<String, BaseRequestHandler<BaseRemoteRequest, 
BaseRemoteResponse>> handlers =
+            new ConcurrentHashMap<>();
+
+    public BaseRequestHandler<BaseRemoteRequest, BaseRemoteResponse> 
getHandler(String type) {
+        return handlers.get(type);
+    }
+
+    @Override
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public void onApplicationEvent(ContextRefreshedEvent event) {
+        Map<String, BaseRequestHandler> beans =
+                
event.getApplicationContext().getBeansOfType(BaseRequestHandler.class);
+
+        for (BaseRequestHandler<BaseRemoteRequest, BaseRemoteResponse> 
requestHandler : beans.values()) {
+            Class<?> clazz = requestHandler.getClass();
+            boolean skip = false;
+            while (!clazz.getSuperclass().equals(BaseRequestHandler.class)) {
+                if (clazz.getSuperclass().equals(Object.class)) {
+                    skip = true;
+                    break;
+                }
+                clazz = clazz.getSuperclass();
+            }
+            if (skip) {
+                continue;
+            }
+
+            Class tClass = (Class) ((ParameterizedType) 
clazz.getGenericSuperclass()).getActualTypeArguments()[0];
+            handlers.putIfAbsent(tClass.getSimpleName(), requestHandler);
+        }
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
new file mode 100644
index 000000000..9362581eb
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/FetchJobRequestHandler.java
@@ -0,0 +1,51 @@
+package com.apache.eventmesh.admin.server.web.handler.impl;
+
+import com.apache.eventmesh.admin.server.AdminServerRuntimeException;
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobDetail;
+import com.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
+import 
com.apache.eventmesh.admin.server.web.service.job.EventMeshJobInfoBizService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.remote.exception.ErrorCode;
+import org.apache.eventmesh.common.remote.request.FetchJobRequest;
+import org.apache.eventmesh.common.remote.response.FetchJobResponse;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class FetchJobRequestHandler extends 
BaseRequestHandler<FetchJobRequest, FetchJobResponse> {
+
+    @Autowired
+    EventMeshJobInfoBizService jobInfoBizService;
+
+    @Override
+    public FetchJobResponse handler(FetchJobRequest request, Metadata 
metadata) {
+        if (StringUtils.isBlank(request.getJobID())) {
+            throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "job 
id is empty");
+        }
+        int jobID;
+        try {
+            jobID = Integer.parseInt(request.getJobID());
+        } catch (NumberFormatException e) {
+            throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, 
String.format("illegal job id %s",
+                    request.getJobID()));
+        }
+        FetchJobResponse response = FetchJobResponse.successResponse();
+        EventMeshJobDetail detail = jobInfoBizService.getJobDetail(request, 
metadata);
+        if (detail == null) {
+            return response;
+        }
+        response.setId(detail.getId());
+        response.setName(detail.getName());
+        response.setSourceConnectorConfig(detail.getSourceConnectorConfig());
+        response.setSourceConnectorDesc(detail.getSourceConnectorDesc());
+        response.setTransportType(detail.getTransportType());
+        response.setSinkConnectorConfig(detail.getSinkConnectorConfig());
+        response.setSourceConnectorDesc(detail.getSinkConnectorDesc());
+        response.setState(detail.getState());
+        response.setPosition(detail.getPosition());
+        return response;
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/FetchPositionHandler.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/FetchPositionHandler.java
new file mode 100644
index 000000000..bc5bc2050
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/FetchPositionHandler.java
@@ -0,0 +1,36 @@
+package com.apache.eventmesh.admin.server.web.handler.impl;
+
+import com.apache.eventmesh.admin.server.AdminServerRuntimeException;
+import com.apache.eventmesh.admin.server.web.db.DBThreadPool;
+import com.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
+import 
com.apache.eventmesh.admin.server.web.service.position.EventMeshPositionBizService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.remote.exception.ErrorCode;
+import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
+import org.apache.eventmesh.common.remote.response.FetchPositionResponse;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class FetchPositionHandler extends 
BaseRequestHandler<FetchPositionRequest, FetchPositionResponse> {
+
+    @Autowired
+    DBThreadPool executor;
+
+    @Autowired
+    EventMeshPositionBizService positionBizService;
+
+    @Override
+    protected FetchPositionResponse handler(FetchPositionRequest request, 
Metadata metadata) {
+        if (request.getDataSourceType() == null) {
+            throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, 
"illegal data type, it's empty");
+        }
+        if (StringUtils.isBlank(request.getJobID())) {
+            throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, 
"illegal job id, it's empty");
+        }
+        return 
FetchPositionResponse.successResponse(positionBizService.getPosition(request, 
metadata));
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/ReportHeartBeatHandler.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/ReportHeartBeatHandler.java
new file mode 100644
index 000000000..5011158f3
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/ReportHeartBeatHandler.java
@@ -0,0 +1,50 @@
+package com.apache.eventmesh.admin.server.web.handler.impl;
+
+import com.apache.eventmesh.admin.server.web.db.DBThreadPool;
+import 
com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat;
+import com.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
+import 
com.apache.eventmesh.admin.server.web.service.heatbeat.EventMeshRuntimeHeartbeatBizService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
+import org.apache.eventmesh.common.remote.response.EmptyAckResponse;
+import org.apache.eventmesh.common.utils.IPUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class ReportHeartBeatHandler extends 
BaseRequestHandler<ReportHeartBeatRequest, EmptyAckResponse> {
+    @Autowired
+    EventMeshRuntimeHeartbeatBizService heartbeatBizService;
+
+    @Autowired
+    DBThreadPool executor;
+
+    @Override
+    protected EmptyAckResponse handler(ReportHeartBeatRequest request, 
Metadata metadata) {
+        executor.getExecutors().execute(() -> {
+            EventMeshRuntimeHeartbeat heartbeat = new 
EventMeshRuntimeHeartbeat();
+            int job;
+            try {
+                job = Integer.parseInt(request.getJobID());
+            } catch (NumberFormatException e) {
+                log.warn("runtime {} report heartbeat fail, illegal job id 
{}", request.getAddress(), request.getJobID());
+                return;
+            }
+            heartbeat.setJobID(job);
+            heartbeat.setReportTime(request.getReportedTimeStamp());
+            heartbeat.setAdminAddr(IPUtils.getLocalAddress());
+            heartbeat.setRuntimeAddr(request.getAddress());
+            try {
+                if 
(!heartbeatBizService.saveOrUpdateByRuntimeAddress(heartbeat)) {
+                    log.warn("save or update heartbeat request [{}] fail", 
request);
+                }
+            } catch (Exception e) {
+                log.warn("save or update heartbeat request [{}] fail", 
request, e);
+            }
+        });
+
+        return new EmptyAckResponse();
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
new file mode 100644
index 000000000..168b4980b
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/handler/impl/ReportPositionHandler.java
@@ -0,0 +1,81 @@
+package com.apache.eventmesh.admin.server.web.handler.impl;
+
+import com.apache.eventmesh.admin.server.AdminServerRuntimeException;
+import com.apache.eventmesh.admin.server.web.db.DBThreadPool;
+import com.apache.eventmesh.admin.server.web.handler.BaseRequestHandler;
+import 
com.apache.eventmesh.admin.server.web.service.job.EventMeshJobInfoBizService;
+import 
com.apache.eventmesh.admin.server.web.service.position.EventMeshPositionBizService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.remote.exception.ErrorCode;
+import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
+import org.apache.eventmesh.common.remote.response.EmptyAckResponse;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class ReportPositionHandler extends 
BaseRequestHandler<ReportPositionRequest, EmptyAckResponse> {
+    @Autowired
+    EventMeshJobInfoBizService jobInfoBizService;
+
+    @Autowired
+    DBThreadPool executor;
+
+    @Autowired
+    EventMeshPositionBizService positionBizService;
+
+
+    @Override
+    protected EmptyAckResponse handler(ReportPositionRequest request, Metadata 
metadata) {
+        if (request.getDataSourceType() == null) {
+            throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, 
"illegal data type, it's empty");
+        }
+        if (StringUtils.isBlank(request.getJobID())) {
+            throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, 
"illegal job id, it's empty");
+        }
+        if (request.getRecordPositionList() == null || 
request.getRecordPositionList().isEmpty()) {
+            throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, 
"illegal record position list, it's empty");
+        }
+        int jobID;
+
+        try {
+            jobID = Integer.parseInt(request.getJobID());
+        } catch (NumberFormatException e) {
+            throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, 
String.format("illegal job id [%s] format",
+                    request.getJobID()));
+        }
+
+        
positionBizService.isValidatePositionRequest(request.getDataSourceType());
+
+        executor.getExecutors().execute(() -> {
+            try {
+                boolean reported = positionBizService.reportPosition(request, 
metadata);
+                if (reported) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("handle runtime [{}] report data type [{}] 
job [{}] position [{}] success",
+                                request.getAddress(), 
request.getDataSourceType(), request.getJobID(),
+                                request.getRecordPositionList());
+                    }
+                } else {
+                    log.warn("handle runtime [{}] report data type [{}] job 
[{}] position [{}] fail",
+                            request.getAddress(), request.getDataSourceType(), 
request.getJobID(),
+                            request.getRecordPositionList());
+                }
+            } catch (Exception e) {
+                log.warn("handle position request fail, request [{}]", 
request, e);
+            } finally {
+                try {
+                    if (!jobInfoBizService.updateJobState(jobID, 
request.getState())) {
+                        log.warn("update job [{}] state to [{}] fail", jobID, 
request.getState());
+                    }
+                } catch (Exception e) {
+                    log.warn("update job id [{}] type [{}] state [{}] fail", 
request.getJobID(),
+                            request.getDataSourceType(), request.getState(), 
e);
+                }
+            }
+        });
+        return new EmptyAckResponse();
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/heatbeat/EventMeshRuntimeHeartbeatBizService.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/heatbeat/EventMeshRuntimeHeartbeatBizService.java
new file mode 100644
index 000000000..f5fd4ae8c
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/heatbeat/EventMeshRuntimeHeartbeatBizService.java
@@ -0,0 +1,63 @@
+package com.apache.eventmesh.admin.server.web.service.heatbeat;
+
+import 
com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat;
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory;
+import 
com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHeartbeatService;
+import 
com.apache.eventmesh.admin.server.web.db.service.EventMeshRuntimeHistoryService;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+* @author sodafang
+* @description 针对表【event_mesh_runtime_heartbeat】的数据库操作Service实现
+* @createDate 2024-05-14 17:15:03
+*/
+@Service
+@Slf4j
+public class EventMeshRuntimeHeartbeatBizService {
+
+    @Autowired
+    EventMeshRuntimeHistoryService historyService;
+
+    @Autowired
+    EventMeshRuntimeHeartbeatService heartbeatService;
+
+    public boolean saveOrUpdateByRuntimeAddress(EventMeshRuntimeHeartbeat 
entity) {
+        EventMeshRuntimeHeartbeat old = 
heartbeatService.getOne(Wrappers.<EventMeshRuntimeHeartbeat>query().eq(
+                "runtimeAddr",
+                entity.getRuntimeAddr()));
+        if (old == null) {
+            return heartbeatService.save(entity);
+        } else {
+            if (Long.parseLong(old.getReportTime()) >= 
Long.parseLong(entity.getReportTime())) {
+                log.info("update heartbeat record ignore, current report time 
late than db, job " +
+                        "[{}], remote [{}]", entity.getJobID(), 
entity.getRuntimeAddr());
+                return true;
+            }
+            try {
+                return heartbeatService.update(entity, 
Wrappers.<EventMeshRuntimeHeartbeat>update().eq("updateTime",
+                        old.getUpdateTime()));
+            } finally {
+                if (old.getJobID() != null && 
!old.getJobID().equals(entity.getJobID())) {
+                    EventMeshRuntimeHistory history = new 
EventMeshRuntimeHistory();
+                    history.setAddress(old.getAdminAddr());
+                    history.setJob(old.getJobID());
+                    try {
+                        historyService.save(history);
+                    } catch (Exception e) {
+                        log.warn("save runtime job changed history fail", e);
+                    }
+
+                    log.info("runtime [{}] changed job, old job [{}], now 
[{}]",entity.getRuntimeAddr(),old.getJobID(),
+                            entity.getJobID());
+                }
+            }
+        }
+    }
+}
+
+
+
+
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/job/EventMeshJobInfoBizService.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/job/EventMeshJobInfoBizService.java
new file mode 100644
index 000000000..923dc086d
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/job/EventMeshJobInfoBizService.java
@@ -0,0 +1,111 @@
+package com.apache.eventmesh.admin.server.web.service.job;
+
+import com.apache.eventmesh.admin.server.AdminServerRuntimeException;
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource;
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobDetail;
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo;
+import 
com.apache.eventmesh.admin.server.web.db.service.EventMeshDataSourceService;
+import 
com.apache.eventmesh.admin.server.web.db.service.EventMeshJobInfoService;
+import 
com.apache.eventmesh.admin.server.web.service.position.EventMeshPositionBizService;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.fasterxml.jackson.core.type.TypeReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.remote.JobState;
+import org.apache.eventmesh.common.remote.exception.ErrorCode;
+import org.apache.eventmesh.common.remote.job.DataSourceType;
+import org.apache.eventmesh.common.remote.job.JobTransportType;
+import org.apache.eventmesh.common.remote.request.FetchJobRequest;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+
+/**
+* @author sodafang
+* @description 针对表【event_mesh_job_info】的数据库操作Service实现
+* @createDate 2024-05-09 15:51:45
+*/
+@Service
+@Slf4j
+public class EventMeshJobInfoBizService {
+
+    @Autowired
+    EventMeshJobInfoService jobInfoService;
+
+    @Autowired
+    EventMeshDataSourceService dataSourceService;
+
+    @Autowired
+    EventMeshPositionBizService positionBizService;
+
+    public boolean updateJobState(Integer jobID, JobState state) {
+        if (jobID == null || state == null) {
+            return false;
+        }
+        EventMeshJobInfo jobInfo = new EventMeshJobInfo();
+        jobInfo.setJobID(jobID);
+        jobInfo.setState(state.ordinal());
+        jobInfoService.update(jobInfo, 
Wrappers.<EventMeshJobInfo>update().notIn("state",JobState.DELETE.ordinal(),
+                JobState.COMPLETE.ordinal()));
+        return true;
+    }
+
+    public EventMeshJobDetail getJobDetail(FetchJobRequest request, Metadata 
metadata) {
+        if (request == null) {
+            return null;
+        }
+        EventMeshJobInfo job = jobInfoService.getById(request.getJobID());
+        if (job == null) {
+            return null;
+        }
+        EventMeshJobDetail detail = new EventMeshJobDetail();
+        detail.setId(job.getJobID());
+        detail.setName(job.getName());
+        EventMeshDataSource source = 
dataSourceService.getById(job.getSourceData());
+        EventMeshDataSource target = 
dataSourceService.getById(job.getTargetData());
+        if (source != null) {
+            if (!StringUtils.isBlank(source.getConfiguration())) {
+                try {
+                    
detail.setSourceConnectorConfig(JsonUtils.parseTypeReferenceObject(source.getConfiguration(),
+                            new TypeReference<Map<String, Object>>() {}));
+                } catch (Exception e) {
+                    log.warn("parse source config id [{}] fail", 
job.getSourceData(), e);
+                    throw new 
AdminServerRuntimeException(ErrorCode.BAD_DB_DATA,"illegal source data source 
config");
+                }
+            }
+            detail.setSourceConnectorDesc(source.getDescription());
+            if (source.getDataType() != null) {
+                
detail.setPosition(positionBizService.getPositionByJobID(job.getJobID(),
+                        
DataSourceType.getDataSourceType(source.getDataType())));
+
+            }
+        }
+        if (target != null) {
+            if (!StringUtils.isBlank(target.getConfiguration())) {
+                try {
+                    
detail.setSinkConnectorConfig(JsonUtils.parseTypeReferenceObject(target.getConfiguration(),
+                            new TypeReference<Map<String, Object>>() {}));
+                } catch (Exception e) {
+                    log.warn("parse sink config id [{}] fail", 
job.getSourceData(), e);
+                    throw new 
AdminServerRuntimeException(ErrorCode.BAD_DB_DATA,"illegal target data sink 
config");
+                }
+            }
+            detail.setSinkConnectorDesc(target.getDescription());
+        }
+
+        JobState state = JobState.fromIndex(job.getState());
+        if (state == null) {
+            throw new 
AdminServerRuntimeException(ErrorCode.BAD_DB_DATA,"illegal job state in db");
+        }
+        detail.setState(state);
+        
detail.setTransportType(JobTransportType.getJobTransportType(job.getTransportType()));
+        return detail;
+    }
+}
+
+
+
+
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java
new file mode 100644
index 000000000..03014389a
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java
@@ -0,0 +1,61 @@
+package com.apache.eventmesh.admin.server.web.service.position;
+
+import com.apache.eventmesh.admin.server.AdminServerRuntimeException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.remote.exception.ErrorCode;
+import org.apache.eventmesh.common.remote.job.DataSourceType;
+import org.apache.eventmesh.common.remote.offset.RecordPosition;
+import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
+import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+@Slf4j
+public class EventMeshPositionBizService {
+    @Autowired
+    PositionHandlerFactory factory;
+
+    // called isValidateReportRequest before call this
+    public RecordPosition getPosition(FetchPositionRequest request, Metadata 
metadata) {
+        if (request == null) {
+            return null;
+        }
+        isValidatePositionRequest(request.getDataSourceType());
+        IFetchPositionHandler handler = 
factory.getHandler(request.getDataSourceType());
+        return handler.handler(request, metadata);
+    }
+
+    public void isValidatePositionRequest(DataSourceType type) {
+        if (type == null) {
+            throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, "data 
source type is null");
+        }
+        IReportPositionHandler handler = factory.getHandler(type);
+        if (handler == null) {
+            throw new AdminServerRuntimeException(ErrorCode.BAD_REQUEST, 
String.format("illegal data base " +
+                            "type [%s], it not match any report position 
handler", type));
+        }
+    }
+
+    // called isValidateReportRequest before call this
+    public boolean reportPosition(ReportPositionRequest request, Metadata 
metadata) {
+        if (request == null) {
+            return false;
+        }
+        isValidatePositionRequest(request.getDataSourceType());
+        IReportPositionHandler handler = 
factory.getHandler(request.getDataSourceType());
+        return handler.handler(request, metadata);
+    }
+
+    public RecordPosition getPositionByJobID(Integer jobID, DataSourceType 
type) {
+        if (jobID == null || type == null) {
+            return null;
+        }
+        isValidatePositionRequest(type);
+        PositionHandler handler = factory.getHandler(type);
+        FetchPositionRequest request = new FetchPositionRequest();
+        request.setJobID(String.valueOf(jobID));
+        return handler.handler(request, null);
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java
new file mode 100644
index 000000000..521f07d1b
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java
@@ -0,0 +1,9 @@
+package com.apache.eventmesh.admin.server.web.service.position;
+
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.remote.offset.RecordPosition;
+import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
+
+public interface IFetchPositionHandler {
+    RecordPosition handler(FetchPositionRequest request, Metadata metadata);
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/IReportPositionHandler.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/IReportPositionHandler.java
new file mode 100644
index 000000000..5c7d1ab41
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/IReportPositionHandler.java
@@ -0,0 +1,8 @@
+package com.apache.eventmesh.admin.server.web.service.position;
+
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
+
+public interface IReportPositionHandler {
+    boolean handler(ReportPositionRequest request, Metadata metadata);
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/PositionHandler.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/PositionHandler.java
new file mode 100644
index 000000000..d8ca71797
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/PositionHandler.java
@@ -0,0 +1,7 @@
+package com.apache.eventmesh.admin.server.web.service.position;
+
+import org.apache.eventmesh.common.remote.job.DataSourceType;
+
+public abstract class PositionHandler implements 
IReportPositionHandler,IFetchPositionHandler {
+    protected abstract DataSourceType getSourceType();
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/PositionHandlerFactory.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/PositionHandlerFactory.java
new file mode 100644
index 000000000..6dedb1e3d
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/PositionHandlerFactory.java
@@ -0,0 +1,35 @@
+package com.apache.eventmesh.admin.server.web.service.position;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.eventmesh.common.remote.job.DataSourceType;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Component
+@Slf4j
+public class PositionHandlerFactory implements 
ApplicationListener<ContextRefreshedEvent> {
+    private final Map<DataSourceType, PositionHandler> handlers =
+            new ConcurrentHashMap<>();
+    public PositionHandler getHandler(DataSourceType type) {
+        return handlers.get(type);
+    }
+
+    @Override
+    public void onApplicationEvent(ContextRefreshedEvent event) {
+        Map<String, PositionHandler> beans =
+                
event.getApplicationContext().getBeansOfType(PositionHandler.class);
+
+        for (PositionHandler handler: beans.values()) {
+            DataSourceType type = handler.getSourceType();
+            if (handlers.containsKey(type)) {
+                log.warn("data source type [{}] handler already exists", type);
+                continue;
+            }
+            handlers.put(type, handler);
+        }
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
new file mode 100644
index 000000000..f72b61f5d
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java
@@ -0,0 +1,140 @@
+package com.apache.eventmesh.admin.server.web.service.position.impl;
+
+import com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition;
+import 
com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory;
+import 
com.apache.eventmesh.admin.server.web.db.service.EventMeshMysqlPositionService;
+import 
com.apache.eventmesh.admin.server.web.db.service.EventMeshPositionReporterHistoryService;
+import com.apache.eventmesh.admin.server.web.service.position.PositionHandler;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.eventmesh.common.protocol.grpc.adminserver.Metadata;
+import org.apache.eventmesh.common.remote.job.DataSourceType;
+import org.apache.eventmesh.common.remote.offset.RecordPosition;
+import org.apache.eventmesh.common.remote.offset.canal.CanalRecordOffset;
+import org.apache.eventmesh.common.remote.offset.canal.CanalRecordPartition;
+import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
+import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.dao.DuplicateKeyException;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+@Component
+@Slf4j
+public class MysqlPositionHandler extends PositionHandler {
+    @Autowired
+    EventMeshMysqlPositionService positionService;
+
+    @Autowired
+    EventMeshPositionReporterHistoryService historyService;
+
+    @Override
+    protected DataSourceType getSourceType() {
+        return DataSourceType.MYSQL;
+    }
+
+    public boolean saveOrUpdateByJob(EventMeshMysqlPosition position) {
+        EventMeshMysqlPosition old = 
positionService.getOne(Wrappers.<EventMeshMysqlPosition>query().eq("jobId",
+                position.getJobID()));
+        if (old == null) {
+            return positionService.save(position);
+        } else {
+            if (old.getPosition() >= position.getPosition()) {
+                log.info("job [{}] report position [{}] by runtime [{}] less 
than db position [{}] by [{}]",
+                        position.getJobID(), position.getPosition(), 
position.getAddress(), old.getPosition(), old.getAddress());
+                return true;
+            }
+            try {
+                return positionService.update(position, 
Wrappers.<EventMeshMysqlPosition>update().eq("updateTime",
+                        old.getUpdateTime()));
+            } finally {
+                if (old.getAddress()!= null && 
!old.getAddress().equals(position.getAddress())) {
+                    EventMeshPositionReporterHistory history = new 
EventMeshPositionReporterHistory();
+                    history.setRecord(JsonUtils.toJSONString(position));
+                    history.setJob(old.getJobID());
+                    history.setAddress(old.getAddress());
+                    log.info("job [{}] position reporter changed old [{}], now 
[{}]", position.getJobID(), old, position);
+                    try {
+                        historyService.save(history);
+                    } catch (Exception e) {
+                        log.warn("save job [{}] mysql position reporter 
changed history fail, now reporter [{}], old " +
+                                "[{}]", position.getJobID(), 
position.getAddress(), old.getAddress(), e);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean handler(ReportPositionRequest request, Metadata metadata) {
+        for (int i = 0; i < 3; i++) {
+            try {
+                List<RecordPosition> recordPositionList = 
request.getRecordPositionList();
+                RecordPosition recordPosition = recordPositionList.get(0);
+                if (recordPosition == null || 
recordPosition.getRecordPartition() == null || recordPosition.getRecordOffset() 
== null) {
+                    log.warn("report mysql position, but 
record-partition/partition/offset is null");
+                    return false;
+                }
+                if (!(recordPosition.getRecordPartition() instanceof 
CanalRecordPartition)) {
+                    log.warn("report mysql position, but record partition 
class [{}] not match [{}]",
+                            
recordPosition.getRecordPartition().getRecordPartitionClass(), 
CanalRecordPartition.class);
+                    return false;
+                }
+                if (!(recordPosition.getRecordOffset() instanceof 
CanalRecordOffset)) {
+                    log.warn("report mysql position, but record offset class 
[{}] not match [{}]",
+                            
recordPosition.getRecordOffset().getRecordOffsetClass(), 
CanalRecordOffset.class);
+                    return false;
+                }
+                CanalRecordOffset offset = (CanalRecordOffset) 
recordPosition.getRecordOffset();
+                CanalRecordPartition partition = (CanalRecordPartition) 
recordPosition.getRecordPartition();
+                EventMeshMysqlPosition position = new EventMeshMysqlPosition();
+                position.setJobID(Integer.parseInt(request.getJobID()));
+                position.setAddress(request.getAddress());
+                if (offset != null) {
+                    position.setPosition(offset.getOffset());
+                }
+                if (partition != null) {
+                    position.setTimestamp(partition.getTimeStamp());
+                    position.setJournalName(partition.getJournalName());
+                }
+                if (!saveOrUpdateByJob(position)) {
+                    log.warn("update job position fail [{}]", request);
+                    return false;
+                }
+                return true;
+            } catch (DuplicateKeyException e) {
+                log.warn("concurrent report position job [{}], it will try 
again", request.getJobID());
+            } catch (Exception e) {
+                log.warn("save position job [{}] fail", request.getJobID(), e);
+                return false;
+            }
+            try {
+                Thread.sleep(200);
+            } catch (InterruptedException ignore) {
+                log.warn("save position thread interrupted, [{}]", request);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public RecordPosition handler(FetchPositionRequest request, Metadata 
metadata) {
+        EventMeshMysqlPosition position = 
positionService.getOne(Wrappers.<EventMeshMysqlPosition>query().eq("jobID"
+                , request.getJobID()));
+        RecordPosition recordPosition = null;
+        if (position != null) {
+            CanalRecordPartition partition = new CanalRecordPartition();
+            partition.setTimeStamp(position.getTimestamp());
+            partition.setJournalName(position.getJournalName());
+            CanalRecordOffset offset = new CanalRecordOffset();
+            offset.setOffset(position.getPosition());
+            recordPosition = new RecordPosition();
+            recordPosition.setRecordPartition(partition);
+            recordPosition.setRecordOffset(offset);
+        }
+        return recordPosition;
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/resources/META-INF.eventmesh/com.apache.eventmesh.admin.server.registry.RegistryService
 
b/eventmesh-admin-server/src/main/resources/META-INF.eventmesh/com.apache.eventmesh.admin.server.registry.RegistryService
deleted file mode 100644
index 656fec8f3..000000000
--- 
a/eventmesh-admin-server/src/main/resources/META-INF.eventmesh/com.apache.eventmesh.admin.server.registry.RegistryService
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-nacos=com.apache.eventmesh.admin.server.registry.NacosDiscoveryService
\ No newline at end of file
diff --git 
a/eventmesh-admin-server/src/main/resources/META-INF/spring.factories 
b/eventmesh-admin-server/src/main/resources/META-INF/spring.factories
new file mode 100644
index 000000000..ced02f95f
--- /dev/null
+++ b/eventmesh-admin-server/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,2 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+  com.apache.eventmesh.admin.server.AdminServerProperties
\ No newline at end of file
diff --git a/eventmesh-admin-server/src/main/resources/application.yaml 
b/eventmesh-admin-server/src/main/resources/application.yaml
index aa72432b6..397c3bb42 100644
--- a/eventmesh-admin-server/src/main/resources/application.yaml
+++ b/eventmesh-admin-server/src/main/resources/application.yaml
@@ -1,8 +1,15 @@
 spring:
   datasource:
-    url: 
jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false
-    username: sodafang
-    password: asdfasdf
+    url: 
jdbc:mysql://localhost:3306/eventmesh?serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true
+    username: root
+    password: default
     driver-class-name: com.mysql.cj.jdbc.Driver
-mybatis:
-  mapper-locations: classpath:mapper/*.xml
\ No newline at end of file
+mybatis-plus:
+  mapper-locations: classpath:mapper/*.xml
+  configuration:
+    map-underscore-to-camel-case: false
+    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
+event-mesh:
+  admin-server:
+    service-name: DEFAULT_GROUP@@em_adm_server
+    port: 8081
\ No newline at end of file
diff --git 
a/eventmesh-admin-server/src/main/resources/eventmesh-admin.properties 
b/eventmesh-admin-server/src/main/resources/eventmesh-admin.properties
new file mode 100644
index 000000000..ceb1f59b3
--- /dev/null
+++ b/eventmesh-admin-server/src/main/resources/eventmesh-admin.properties
@@ -0,0 +1,2 @@
+eventMesh.server.retry.plugin.type=nacos
+eventMesh.registry.plugin.server-addr=localhost:8848
diff --git a/eventmesh-admin-server/src/main/resources/eventmesh.sql 
b/eventmesh-admin-server/src/main/resources/eventmesh.sql
new file mode 100644
index 000000000..37217c40b
--- /dev/null
+++ b/eventmesh-admin-server/src/main/resources/eventmesh.sql
@@ -0,0 +1,114 @@
+-- --------------------------------------------------------
+-- 主机:                           127.0.0.1
+-- 服务器版本:                        8.0.36 - MySQL Community Server - GPL
+-- 服务器操作系统:                      Win64
+-- HeidiSQL 版本:                  11.3.0.6295
+-- --------------------------------------------------------
+
+/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
+/*!40101 SET NAMES utf8 */;
+/*!50503 SET NAMES utf8mb4 */;
+/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, 
FOREIGN_KEY_CHECKS=0 */;
+/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
+/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
+
+
+-- 导出 eventmesh 的数据库结构
+CREATE DATABASE IF NOT EXISTS `eventmesh` /*!40100 DEFAULT CHARACTER SET 
utf8mb4 COLLATE utf8mb4_general_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
+USE `eventmesh`;
+
+-- 导出  表 eventmesh.event_mesh_data_source 结构
+CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
+  `id` int unsigned NOT NULL AUTO_INCREMENT,
+  `dataType` int unsigned NOT NULL,
+  `description` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci 
DEFAULT NULL,
+  `configuration` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT 
NULL,
+  `createUid` int NOT NULL,
+  `updateUid` int NOT NULL,
+  `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+  `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP,
+  PRIMARY KEY (`id`) USING BTREE
+) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_general_ci;
+
+-- 数据导出被取消选择。
+
+-- 导出  表 eventmesh.event_mesh_job_info 结构
+CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
+  `jobID` int unsigned NOT NULL AUTO_INCREMENT,
+  `name` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
+  `transportType` int unsigned DEFAULT NULL COMMENT 'JobTransportType',
+  `sourceData` int unsigned DEFAULT NULL COMMENT 'data_source表',
+  `targetData` int unsigned DEFAULT NULL,
+  `state` tinyint unsigned NOT NULL COMMENT 'JobState',
+  `jobType` tinyint unsigned NOT NULL COMMENT 'connector,mesh,func,...',
+  `createUid` int unsigned NOT NULL,
+  `updateUid` int unsigned NOT NULL,
+  `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+  `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP,
+  PRIMARY KEY (`jobID`) USING BTREE
+) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_general_ci;
+
+-- 数据导出被取消选择。
+
+-- 导出  表 eventmesh.event_mesh_mysql_position 结构
+CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
+  `id` int unsigned NOT NULL AUTO_INCREMENT,
+  `jobID` int unsigned NOT NULL,
+  `address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT 
NULL,
+  `position` bigint DEFAULT NULL,
+  `timestamp` bigint DEFAULT NULL,
+  `journalName` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
+  `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+  `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `jobID` (`jobID`)
+) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
+
+-- 数据导出被取消选择。
+
+-- 导出  表 eventmesh.event_mesh_position_reporter_history 结构
+CREATE TABLE IF NOT EXISTS `event_mesh_position_reporter_history` (
+  `id` bigint NOT NULL AUTO_INCREMENT,
+  `job` int NOT NULL,
+  `record` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
+  `address` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
+  `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+  PRIMARY KEY (`id`),
+  KEY `job` (`job`),
+  KEY `address` (`address`)
+) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_general_ci COMMENT='记录position上报者变更时,老记录';
+
+-- 数据导出被取消选择。
+
+-- 导出  表 eventmesh.event_mesh_runtime_heartbeat 结构
+CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` (
+  `id` bigint unsigned NOT NULL AUTO_INCREMENT,
+  `adminAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT 
NULL,
+  `runtimeAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci 
NOT NULL,
+  `jobID` int unsigned DEFAULT NULL,
+  `reportTime` varchar(50) COLLATE utf8mb4_general_ci NOT NULL COMMENT 
'runtime本地上报时间',
+  `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE 
CURRENT_TIMESTAMP,
+  `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `runtimeAddr` (`runtimeAddr`),
+  KEY `jobID` (`jobID`)
+) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_general_ci;
+
+-- 数据导出被取消选择。
+
+-- 导出  表 eventmesh.event_mesh_runtime_history 结构
+CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` (
+  `id` bigint NOT NULL AUTO_INCREMENT,
+  `job` int NOT NULL,
+  `address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT 
NULL DEFAULT '',
+  `createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+  PRIMARY KEY (`id`),
+  KEY `address` (`address`)
+) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='记录runtime上运行任务的变更';
+
+-- 数据导出被取消选择。
+
+/*!40101 SET SQL_MODE=IFNULL(@OLD_SQL_MODE, '') */;
+/*!40014 SET FOREIGN_KEY_CHECKS=IFNULL(@OLD_FOREIGN_KEY_CHECKS, 1) */;
+/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
+/*!40111 SET SQL_NOTES=IFNULL(@OLD_SQL_NOTES, 1) */;
diff --git 
a/eventmesh-admin-server/src/main/resources/mapper/EventMeshDataSourceMapper.xml
 
b/eventmesh-admin-server/src/main/resources/mapper/EventMeshDataSourceMapper.xml
new file mode 100644
index 000000000..540980646
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/resources/mapper/EventMeshDataSourceMapper.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper
+        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+        "http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+<mapper 
namespace="com.apache.eventmesh.admin.server.web.db.mapper.EventMeshDataSourceMapper">
+
+    <resultMap id="BaseResultMap" 
type="com.apache.eventmesh.admin.server.web.db.entity.EventMeshDataSource">
+            <id property="id" column="id" jdbcType="INTEGER"/>
+            <result property="dataType" column="dataType" jdbcType="INTEGER"/>
+            <result property="description" column="description" 
jdbcType="VARCHAR"/>
+            <result property="configuration" column="configuration" 
jdbcType="VARCHAR"/>
+            <result property="createUid" column="createUid" 
jdbcType="INTEGER"/>
+            <result property="updateUid" column="updateUid" 
jdbcType="INTEGER"/>
+            <result property="createTime" column="createTime" 
jdbcType="TIMESTAMP"/>
+            <result property="updateTime" column="updateTime" 
jdbcType="TIMESTAMP"/>
+    </resultMap>
+
+    <sql id="Base_Column_List">
+        id,dataType,address,
+        description,configuration,createUid,
+        updateUid,createTime,updateTime
+    </sql>
+</mapper>
diff --git 
a/eventmesh-admin-server/src/main/resources/mapper/EventMeshJobInfoMapper.xml 
b/eventmesh-admin-server/src/main/resources/mapper/EventMeshJobInfoMapper.xml
new file mode 100644
index 000000000..fd8193b77
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/resources/mapper/EventMeshJobInfoMapper.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper
+        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+        "http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+<mapper 
namespace="com.apache.eventmesh.admin.server.web.db.mapper.EventMeshJobInfoMapper">
+
+    <resultMap id="BaseResultMap" 
type="com.apache.eventmesh.admin.server.web.db.entity.EventMeshJobInfo">
+            <id property="jobID" column="jobID" jdbcType="INTEGER"/>
+            <result property="name" column="name" jdbcType="VARCHAR"/>
+            <result property="transportType" column="transportType" 
jdbcType="INTEGER"/>
+            <result property="sourceData" column="sourceData" 
jdbcType="INTEGER"/>
+            <result property="targetData" column="targetData" 
jdbcType="INTEGER"/>
+            <result property="state" column="state" jdbcType="TINYINT"/>
+            <result property="jobType" column="jobType" jdbcType="TINYINT"/>
+            <result property="createUid" column="createUid" 
jdbcType="INTEGER"/>
+            <result property="updateUid" column="updateUid" 
jdbcType="INTEGER"/>
+            <result property="createTime" column="createTime" 
jdbcType="TIMESTAMP"/>
+            <result property="updateTime" column="updateTime" 
jdbcType="TIMESTAMP"/>
+    </resultMap>
+
+    <sql id="Base_Column_List">
+        jobID,name,transportType,
+        sourceData,targetData,state,
+        runtimeType,createUid,
+        updateUid,createTime,updateTime
+    </sql>
+</mapper>
diff --git 
a/eventmesh-admin-server/src/main/resources/mapper/EventMeshMysqlPositionMapper.xml
 
b/eventmesh-admin-server/src/main/resources/mapper/EventMeshMysqlPositionMapper.xml
new file mode 100644
index 000000000..9851315a3
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/resources/mapper/EventMeshMysqlPositionMapper.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper
+        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+        "http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+<mapper 
namespace="com.apache.eventmesh.admin.server.web.db.mapper.EventMeshMysqlPositionMapper">
+
+    <resultMap id="BaseResultMap" 
type="com.apache.eventmesh.admin.server.web.db.entity.EventMeshMysqlPosition">
+            <id property="id" column="id" jdbcType="INTEGER"/>
+            <result property="jobID" column="jobID" jdbcType="INTEGER"/>
+            <result property="address" column="address" jdbcType="VARCHAR"/>
+            <result property="position" column="position" jdbcType="BIGINT"/>
+            <result property="timestamp" column="timestamp" jdbcType="BIGINT"/>
+            <result property="journalName" column="journalName" 
jdbcType="VARCHAR"/>
+            <result property="createTime" column="createTime" 
jdbcType="TIMESTAMP"/>
+            <result property="updateTime" column="updateTime" 
jdbcType="TIMESTAMP"/>
+    </resultMap>
+
+    <sql id="Base_Column_List">
+        id,jobID,address,
+        position,timestamp,journalName,
+        createTime,updateTime
+    </sql>
+</mapper>
diff --git 
a/eventmesh-admin-server/src/main/resources/mapper/EventMeshPositionReporterHistoryMapper.xml
 
b/eventmesh-admin-server/src/main/resources/mapper/EventMeshPositionReporterHistoryMapper.xml
new file mode 100644
index 000000000..624b5f9ea
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/resources/mapper/EventMeshPositionReporterHistoryMapper.xml
@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper
+        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+        "http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+<mapper 
namespace="com.apache.eventmesh.admin.server.web.db.mapper.EventMeshPositionReporterHistoryMapper">
+
+    <resultMap id="BaseResultMap" 
type="com.apache.eventmesh.admin.server.web.db.entity.EventMeshPositionReporterHistory">
+            <id property="id" column="id" jdbcType="BIGINT"/>
+            <result property="job" column="job" jdbcType="INTEGER"/>
+            <result property="record" column="record" jdbcType="VARCHAR"/>
+            <result property="address" column="address" jdbcType="VARCHAR"/>
+            <result property="createTime" column="createTime" 
jdbcType="TIMESTAMP"/>
+    </resultMap>
+
+    <sql id="Base_Column_List">
+        id,job,record,
+        address,createTime
+    </sql>
+</mapper>
diff --git 
a/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHeartbeatMapper.xml
 
b/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHeartbeatMapper.xml
new file mode 100644
index 000000000..3b49c4858
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHeartbeatMapper.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper
+        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+        "http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+<mapper 
namespace="com.apache.eventmesh.admin.server.web.db.mapper.EventMeshRuntimeHeartbeatMapper">
+
+    <resultMap id="BaseResultMap" 
type="com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHeartbeat">
+            <id property="id" column="id" jdbcType="BIGINT"/>
+            <result property="adminAddr" column="adminAddr" 
jdbcType="VARCHAR"/>
+            <result property="runtimeAddr" column="runtimeAddr" 
jdbcType="VARCHAR"/>
+            <result property="jobID" column="jobID" jdbcType="INTEGER"/>
+            <result property="reportTime" column="reportTime" 
jdbcType="VARCHAR"/>
+            <result property="updateTime" column="updateTime" 
jdbcType="TIMESTAMP"/>
+            <result property="createTime" column="createTime" 
jdbcType="TIMESTAMP"/>
+    </resultMap>
+
+    <sql id="Base_Column_List">
+        id,adminAddr,runtimeAddr,
+        jobID,reportTime,updateTime,
+        createTime
+    </sql>
+</mapper>
diff --git 
a/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHistoryMapper.xml
 
b/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHistoryMapper.xml
new file mode 100644
index 000000000..73ab65a67
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHistoryMapper.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper
+        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+        "http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+<mapper 
namespace="com.apache.eventmesh.admin.server.web.db.mapper.EventMeshRuntimeHistoryMapper">
+
+    <resultMap id="BaseResultMap" 
type="com.apache.eventmesh.admin.server.web.db.entity.EventMeshRuntimeHistory">
+            <result property="id" column="id" jdbcType="BIGINT"/>
+            <result property="job" column="job" jdbcType="INTEGER"/>
+            <result property="address" column="address" jdbcType="VARCHAR"/>
+            <result property="createTime" column="createTime" 
jdbcType="TIMESTAMP"/>
+    </resultMap>
+
+    <sql id="Base_Column_List">
+        id,job,address,
+        createTime
+    </sql>
+</mapper>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to