This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch eventmesh-function
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/eventmesh-function by this 
push:
     new 02f6d44c1 Eventmesh function admin (#4854)
02f6d44c1 is described below

commit 02f6d44c178fad471b67406b019635eb8c46c0f3
Author: sodaRyCN <[email protected]>
AuthorDate: Mon Apr 22 15:54:36 2024 +0800

    Eventmesh function admin (#4854)
    
    * own
    
    * dependency
    
    * finish registry
    
    * init
    
    * 0419
    
    * 0419
    
    * more discovery and move gRPC
    
    * fix dependency
---
 eventmesh-admin-server/build.gradle                |   2 +
 .../apache/eventmesh/admin/server/AdminServer.java |  16 +-
 .../EventMeshAdminServerConfiguration.java         |   2 +-
 .../registry/EventMeshAdminServerRegisterInfo.java |  14 -
 .../admin/server/registry/RegistryService.java     |  20 -
 .../eventmesh/admin/server/web/GrpcServer.java     |   2 +-
 .../common/config/CommonConfiguration.java         |  25 +-
 .../grpc/adminserver/AdminBiStreamServiceGrpc.java | 260 +++++++
 .../grpc/adminserver/AdminServiceGrpc.java         | 276 +++++++
 .../grpc/adminserver/EventMeshAdminService.java    |  77 ++
 .../common/protocol/grpc/adminserver/Metadata.java | 841 +++++++++++++++++++++
 .../grpc/adminserver/MetadataOrBuilder.java        |  53 ++
 .../common/protocol/grpc/adminserver/Payload.java  | 793 +++++++++++++++++++
 .../grpc/adminserver/PayloadOrBuilder.java         |  37 +
 eventmesh-registry/.gitignore                      |  42 +
 eventmesh-registry/build.gradle                    |   0
 .../eventmesh-registry-api/build.gradle            |   8 +
 .../registry/AbstractRegistryListener.java         |   2 +-
 .../apache/eventmesh/registry/QueryInstances.java  |  13 +
 .../eventmesh/registry/RegisterServerInfo.java     |  41 +
 .../org/apache/eventmesh}/registry/Registry.java   |  22 +-
 .../eventmesh}/registry/RegistryListener.java      |   2 +-
 .../apache/eventmesh/registry/RegistryService.java |  26 +
 .../registry/exception/RegistryException.java      |  11 +
 .../eventmesh-registry-nacos/build.gradle          |   8 +
 .../registry/nacos}/NacosDiscoveryService.java     | 101 ++-
 .../nacos}/NacosRegistryConfiguration.java         |   2 +-
 settings.gradle                                    |   6 +
 28 files changed, 2608 insertions(+), 94 deletions(-)

diff --git a/eventmesh-admin-server/build.gradle 
b/eventmesh-admin-server/build.gradle
index 9d8e2b99f..e3ab6611b 100644
--- a/eventmesh-admin-server/build.gradle
+++ b/eventmesh-admin-server/build.gradle
@@ -1,6 +1,8 @@
 dependencies {
        implementation project(":eventmesh-spi")
        implementation project(":eventmesh-common")
+       implementation project(":eventmesh-registry:eventmesh-registry-api")
+       implementation project(":eventmesh-registry:eventmesh-registry-nacos")
        implementation "com.alibaba.nacos:nacos-client"
        implementation ("org.springframework.boot:spring-boot-starter-web") {
                exclude group: "org.springframework.boot", module: 
"spring-boot-starter-tomcat"
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java
index cdffdd21a..b4ab41a63 100644
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java
@@ -1,22 +1,21 @@
 package com.apache.eventmesh.admin.server;
 
-import 
com.apache.eventmesh.admin.server.registry.EventMeshAdminServerRegisterInfo;
-import com.apache.eventmesh.admin.server.registry.RegistryService;
 import com.apache.eventmesh.admin.server.task.Task;
 import org.apache.eventmesh.common.utils.PagedList;
+import org.apache.eventmesh.registry.RegistryService;
 
 public class AdminServer implements Admin {
 
     private RegistryService registryService;
 
-    private EventMeshAdminServerRegisterInfo registerInfo;
+//    private EventMeshAdminServerRegisterInfo registerInfo;
 
-    public AdminServer(RegistryService registryService, 
EventMeshAdminServerRegisterInfo registerInfo) {
+    public AdminServer(RegistryService registryService) {
         this.registryService = registryService;
-        this.registerInfo = registerInfo;
+//        this.registerInfo = registerInfo;
     }
 
-    public static final String ConfigurationKey = "admin-server";
+
     @Override
     public boolean createOrUpdateTask(Task task) {
         return false;
@@ -44,13 +43,12 @@ public class AdminServer implements Admin {
 
     @Override
     public void start() {
-
-        registryService.register(registerInfo);
+        registryService.register(null);
     }
 
     @Override
     public void destroy() {
-        registryService.unRegister(registerInfo);
+        registryService.unRegister(null);
         registryService.shutdown();
     }
 }
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerConfiguration.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/EventMeshAdminServerConfiguration.java
similarity index 95%
rename from 
eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerConfiguration.java
rename to 
eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/EventMeshAdminServerConfiguration.java
index dc436b28d..aab5b7cc7 100644
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerConfiguration.java
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/EventMeshAdminServerConfiguration.java
@@ -1,4 +1,4 @@
-package com.apache.eventmesh.admin.server.registry;
+package com.apache.eventmesh.admin.server;
 
 import lombok.Data;
 import lombok.EqualsAndHashCode;
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerRegisterInfo.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerRegisterInfo.java
deleted file mode 100644
index c51ae6417..000000000
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerRegisterInfo.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package com.apache.eventmesh.admin.server.registry;
-
-import lombok.Data;
-
-import java.util.Map;
-
-@Data
-public class EventMeshAdminServerRegisterInfo {
-    private String eventMeshClusterName;
-    private String eventMeshName;
-    private String address;
-
-    private Map<String, String> metadata;
-}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryService.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryService.java
deleted file mode 100644
index 0cddd009a..000000000
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryService.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package com.apache.eventmesh.admin.server.registry;
-
-import com.apache.eventmesh.admin.server.AdminException;
-import org.apache.eventmesh.spi.EventMeshExtensionType;
-import org.apache.eventmesh.spi.EventMeshSPI;
-
-@EventMeshSPI(eventMeshExtensionType = EventMeshExtensionType.REGISTRY)
-public interface RegistryService {
-    void init() throws AdminException;
-
-    void shutdown() throws AdminException;
-
-    void subscribe(RegistryListener registryListener, String serviceName);
-
-    void unsubscribe(RegistryListener registryListener, String serviceName);
-
-    boolean register(EventMeshAdminServerRegisterInfo eventMeshRegisterInfo) 
throws AdminException;
-
-    boolean unRegister(EventMeshAdminServerRegisterInfo 
eventMeshUnRegisterInfo) throws AdminException;
-}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java
index f237b8f77..fee889a89 100644
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/GrpcServer.java
@@ -1,7 +1,7 @@
 package com.apache.eventmesh.admin.server.web;
 
 import com.apache.eventmesh.admin.server.ComponentLifeCycle;
-import 
com.apache.eventmesh.admin.server.web.generated.AdminBiStreamServiceGrpc;
+import 
org.apache.eventmesh.common.protocol.grpc.adminserver.AdminBiStreamServiceGrpc;
 import org.springframework.stereotype.Controller;
 
 @Controller
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
index 338edf353..ca20e2b33 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java
@@ -17,20 +17,17 @@
 
 package org.apache.eventmesh.common.config;
 
-import static org.apache.eventmesh.common.Constants.HTTP;
-
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.eventmesh.common.Constants;
 import org.apache.eventmesh.common.utils.IPUtils;
-
-import org.apache.commons.collections4.CollectionUtils;
+import org.assertj.core.util.Strings;
 
 import java.util.Collections;
 import java.util.List;
 
-import org.assertj.core.util.Strings;
-
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import static org.apache.eventmesh.common.Constants.HTTP;
 
 @Data
 @NoArgsConstructor
@@ -115,6 +112,18 @@ public class CommonConfiguration {
     @ConfigFiled(field = "server.retry.plugin.type")
     private String eventMeshRetryPluginType = Constants.DEFAULT;
 
+    @ConfigFiled(field = "registry.plugin.server-addr", notEmpty = true)
+    private String registryAddr = "";
+
+    @ConfigFiled(field = "registry.plugin.type", notEmpty = true)
+    private String eventMeshRegistryPluginType = "nacos";
+
+    @ConfigFiled(field = "registry.plugin.username")
+    private String eventMeshRegistryPluginUsername = "";
+
+    @ConfigFiled(field = "registry.plugin.password")
+    private String eventMeshRegistryPluginPassword = "";
+
     public void reload() {
         this.eventMeshWebhookOrigin = "eventmesh." + eventMeshIDC;
 
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/AdminBiStreamServiceGrpc.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/AdminBiStreamServiceGrpc.java
new file mode 100644
index 000000000..21df4d9b0
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/AdminBiStreamServiceGrpc.java
@@ -0,0 +1,260 @@
+package org.apache.eventmesh.common.protocol.grpc.adminserver;
+
+import static io.grpc.MethodDescriptor.generateFullMethodName;
+
+/**
+ */
[email protected](
+    value = "by gRPC proto compiler (version 1.40.0)",
+    comments = "Source: event_mesh_admin_service.proto")
[email protected]
+public final class AdminBiStreamServiceGrpc {
+
+  private AdminBiStreamServiceGrpc() {}
+
+  public static final String SERVICE_NAME = "AdminBiStreamService";
+
+  // Static method descriptors that strictly reflect the proto.
+  private static volatile io.grpc.MethodDescriptor<Payload,
+      Payload> getInvokeBiStreamMethod;
+
+  @io.grpc.stub.annotations.RpcMethod(
+      fullMethodName = SERVICE_NAME + '/' + "invokeBiStream",
+      requestType = Payload.class,
+      responseType = Payload.class,
+      methodType = io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING)
+  public static io.grpc.MethodDescriptor<Payload,
+      Payload> getInvokeBiStreamMethod() {
+    io.grpc.MethodDescriptor<Payload, Payload> getInvokeBiStreamMethod;
+    if ((getInvokeBiStreamMethod = 
AdminBiStreamServiceGrpc.getInvokeBiStreamMethod) == null) {
+      synchronized (AdminBiStreamServiceGrpc.class) {
+        if ((getInvokeBiStreamMethod = 
AdminBiStreamServiceGrpc.getInvokeBiStreamMethod) == null) {
+          AdminBiStreamServiceGrpc.getInvokeBiStreamMethod = 
getInvokeBiStreamMethod =
+              io.grpc.MethodDescriptor.<Payload, Payload>newBuilder()
+              .setType(io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING)
+              .setFullMethodName(generateFullMethodName(SERVICE_NAME, 
"invokeBiStream"))
+              .setSampledToLocalTracing(true)
+              .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
+                  Payload.getDefaultInstance()))
+              .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
+                  Payload.getDefaultInstance()))
+              .setSchemaDescriptor(new 
AdminBiStreamServiceMethodDescriptorSupplier("invokeBiStream"))
+              .build();
+        }
+      }
+    }
+    return getInvokeBiStreamMethod;
+  }
+
+  /**
+   * Creates a new async stub that supports all call types for the service
+   */
+  public static AdminBiStreamServiceStub newStub(io.grpc.Channel channel) {
+    io.grpc.stub.AbstractStub.StubFactory<AdminBiStreamServiceStub> factory =
+      new io.grpc.stub.AbstractStub.StubFactory<AdminBiStreamServiceStub>() {
+        @Override
+        public AdminBiStreamServiceStub newStub(io.grpc.Channel channel, 
io.grpc.CallOptions callOptions) {
+          return new AdminBiStreamServiceStub(channel, callOptions);
+        }
+      };
+    return AdminBiStreamServiceStub.newStub(factory, channel);
+  }
+
+  /**
+   * Creates a new blocking-style stub that supports unary and streaming 
output calls on the service
+   */
+  public static AdminBiStreamServiceBlockingStub newBlockingStub(
+      io.grpc.Channel channel) {
+    io.grpc.stub.AbstractStub.StubFactory<AdminBiStreamServiceBlockingStub> 
factory =
+      new 
io.grpc.stub.AbstractStub.StubFactory<AdminBiStreamServiceBlockingStub>() {
+        @Override
+        public AdminBiStreamServiceBlockingStub newStub(io.grpc.Channel 
channel, io.grpc.CallOptions callOptions) {
+          return new AdminBiStreamServiceBlockingStub(channel, callOptions);
+        }
+      };
+    return AdminBiStreamServiceBlockingStub.newStub(factory, channel);
+  }
+
+  /**
+   * Creates a new ListenableFuture-style stub that supports unary calls on 
the service
+   */
+  public static AdminBiStreamServiceFutureStub newFutureStub(
+      io.grpc.Channel channel) {
+    io.grpc.stub.AbstractStub.StubFactory<AdminBiStreamServiceFutureStub> 
factory =
+      new 
io.grpc.stub.AbstractStub.StubFactory<AdminBiStreamServiceFutureStub>() {
+        @Override
+        public AdminBiStreamServiceFutureStub newStub(io.grpc.Channel channel, 
io.grpc.CallOptions callOptions) {
+          return new AdminBiStreamServiceFutureStub(channel, callOptions);
+        }
+      };
+    return AdminBiStreamServiceFutureStub.newStub(factory, channel);
+  }
+
+  /**
+   */
+  public static abstract class AdminBiStreamServiceImplBase implements 
io.grpc.BindableService {
+
+    /**
+     */
+    public io.grpc.stub.StreamObserver<Payload> invokeBiStream(
+        io.grpc.stub.StreamObserver<Payload> responseObserver) {
+      return 
io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall(getInvokeBiStreamMethod(),
 responseObserver);
+    }
+
+    @Override public final io.grpc.ServerServiceDefinition bindService() {
+      return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
+          .addMethod(
+            getInvokeBiStreamMethod(),
+            io.grpc.stub.ServerCalls.asyncBidiStreamingCall(
+              new MethodHandlers<
+                Payload,
+                Payload>(
+                  this, METHODID_INVOKE_BI_STREAM)))
+          .build();
+    }
+  }
+
+  /**
+   */
+  public static final class AdminBiStreamServiceStub extends 
io.grpc.stub.AbstractAsyncStub<AdminBiStreamServiceStub> {
+    private AdminBiStreamServiceStub(
+        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+      super(channel, callOptions);
+    }
+
+    @Override
+    protected AdminBiStreamServiceStub build(
+        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+      return new AdminBiStreamServiceStub(channel, callOptions);
+    }
+
+    /**
+     */
+    public io.grpc.stub.StreamObserver<Payload> invokeBiStream(
+        io.grpc.stub.StreamObserver<Payload> responseObserver) {
+      return io.grpc.stub.ClientCalls.asyncBidiStreamingCall(
+          getChannel().newCall(getInvokeBiStreamMethod(), getCallOptions()), 
responseObserver);
+    }
+  }
+
+  /**
+   */
+  public static final class AdminBiStreamServiceBlockingStub extends 
io.grpc.stub.AbstractBlockingStub<AdminBiStreamServiceBlockingStub> {
+    private AdminBiStreamServiceBlockingStub(
+        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+      super(channel, callOptions);
+    }
+
+    @Override
+    protected AdminBiStreamServiceBlockingStub build(
+        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+      return new AdminBiStreamServiceBlockingStub(channel, callOptions);
+    }
+  }
+
+  /**
+   */
+  public static final class AdminBiStreamServiceFutureStub extends 
io.grpc.stub.AbstractFutureStub<AdminBiStreamServiceFutureStub> {
+    private AdminBiStreamServiceFutureStub(
+        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+      super(channel, callOptions);
+    }
+
+    @Override
+    protected AdminBiStreamServiceFutureStub build(
+        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+      return new AdminBiStreamServiceFutureStub(channel, callOptions);
+    }
+  }
+
+  private static final int METHODID_INVOKE_BI_STREAM = 0;
+
+  private static final class MethodHandlers<Req, Resp> implements
+      io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
+      io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
+      io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
+      io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
+    private final AdminBiStreamServiceImplBase serviceImpl;
+    private final int methodId;
+
+    MethodHandlers(AdminBiStreamServiceImplBase serviceImpl, int methodId) {
+      this.serviceImpl = serviceImpl;
+      this.methodId = methodId;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> 
responseObserver) {
+      switch (methodId) {
+        default:
+          throw new AssertionError();
+      }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public io.grpc.stub.StreamObserver<Req> invoke(
+        io.grpc.stub.StreamObserver<Resp> responseObserver) {
+      switch (methodId) {
+        case METHODID_INVOKE_BI_STREAM:
+          return (io.grpc.stub.StreamObserver<Req>) serviceImpl.invokeBiStream(
+              (io.grpc.stub.StreamObserver<Payload>) responseObserver);
+        default:
+          throw new AssertionError();
+      }
+    }
+  }
+
+  private static abstract class AdminBiStreamServiceBaseDescriptorSupplier
+      implements io.grpc.protobuf.ProtoFileDescriptorSupplier, 
io.grpc.protobuf.ProtoServiceDescriptorSupplier {
+    AdminBiStreamServiceBaseDescriptorSupplier() {}
+
+    @Override
+    public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
+      return EventMeshAdminService.getDescriptor();
+    }
+
+    @Override
+    public com.google.protobuf.Descriptors.ServiceDescriptor 
getServiceDescriptor() {
+      return getFileDescriptor().findServiceByName("AdminBiStreamService");
+    }
+  }
+
+  private static final class AdminBiStreamServiceFileDescriptorSupplier
+      extends AdminBiStreamServiceBaseDescriptorSupplier {
+    AdminBiStreamServiceFileDescriptorSupplier() {}
+  }
+
+  private static final class AdminBiStreamServiceMethodDescriptorSupplier
+      extends AdminBiStreamServiceBaseDescriptorSupplier
+      implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
+    private final String methodName;
+
+    AdminBiStreamServiceMethodDescriptorSupplier(String methodName) {
+      this.methodName = methodName;
+    }
+
+    @Override
+    public com.google.protobuf.Descriptors.MethodDescriptor 
getMethodDescriptor() {
+      return getServiceDescriptor().findMethodByName(methodName);
+    }
+  }
+
+  private static volatile io.grpc.ServiceDescriptor serviceDescriptor;
+
+  public static io.grpc.ServiceDescriptor getServiceDescriptor() {
+    io.grpc.ServiceDescriptor result = serviceDescriptor;
+    if (result == null) {
+      synchronized (AdminBiStreamServiceGrpc.class) {
+        result = serviceDescriptor;
+        if (result == null) {
+          serviceDescriptor = result = 
io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
+              .setSchemaDescriptor(new 
AdminBiStreamServiceFileDescriptorSupplier())
+              .addMethod(getInvokeBiStreamMethod())
+              .build();
+        }
+      }
+    }
+    return result;
+  }
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/AdminServiceGrpc.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/AdminServiceGrpc.java
new file mode 100644
index 000000000..0b1b7119b
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/AdminServiceGrpc.java
@@ -0,0 +1,276 @@
+package org.apache.eventmesh.common.protocol.grpc.adminserver;
+
+import static io.grpc.MethodDescriptor.generateFullMethodName;
+
+/**
+ */
[email protected](
+    value = "by gRPC proto compiler (version 1.40.0)",
+    comments = "Source: event_mesh_admin_service.proto")
[email protected]
+public final class AdminServiceGrpc {
+
+  private AdminServiceGrpc() {}
+
+  public static final String SERVICE_NAME = "AdminService";
+
+  // Static method descriptors that strictly reflect the proto.
+  private static volatile io.grpc.MethodDescriptor<Payload,
+      Payload> getInvokeMethod;
+
+  @io.grpc.stub.annotations.RpcMethod(
+      fullMethodName = SERVICE_NAME + '/' + "invoke",
+      requestType = Payload.class,
+      responseType = Payload.class,
+      methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
+  public static io.grpc.MethodDescriptor<Payload,
+      Payload> getInvokeMethod() {
+    io.grpc.MethodDescriptor<Payload, Payload> getInvokeMethod;
+    if ((getInvokeMethod = AdminServiceGrpc.getInvokeMethod) == null) {
+      synchronized (AdminServiceGrpc.class) {
+        if ((getInvokeMethod = AdminServiceGrpc.getInvokeMethod) == null) {
+          AdminServiceGrpc.getInvokeMethod = getInvokeMethod =
+              io.grpc.MethodDescriptor.<Payload, Payload>newBuilder()
+              .setType(io.grpc.MethodDescriptor.MethodType.UNARY)
+              .setFullMethodName(generateFullMethodName(SERVICE_NAME, 
"invoke"))
+              .setSampledToLocalTracing(true)
+              .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
+                  Payload.getDefaultInstance()))
+              .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
+                  Payload.getDefaultInstance()))
+              .setSchemaDescriptor(new 
AdminServiceMethodDescriptorSupplier("invoke"))
+              .build();
+        }
+      }
+    }
+    return getInvokeMethod;
+  }
+
+  /**
+   * Creates a new async stub that supports all call types for the service
+   */
+  public static AdminServiceStub newStub(io.grpc.Channel channel) {
+    io.grpc.stub.AbstractStub.StubFactory<AdminServiceStub> factory =
+      new io.grpc.stub.AbstractStub.StubFactory<AdminServiceStub>() {
+        @Override
+        public AdminServiceStub newStub(io.grpc.Channel channel, 
io.grpc.CallOptions callOptions) {
+          return new AdminServiceStub(channel, callOptions);
+        }
+      };
+    return AdminServiceStub.newStub(factory, channel);
+  }
+
+  /**
+   * Creates a new blocking-style stub that supports unary and streaming 
output calls on the service
+   */
+  public static AdminServiceBlockingStub newBlockingStub(
+      io.grpc.Channel channel) {
+    io.grpc.stub.AbstractStub.StubFactory<AdminServiceBlockingStub> factory =
+      new io.grpc.stub.AbstractStub.StubFactory<AdminServiceBlockingStub>() {
+        @Override
+        public AdminServiceBlockingStub newStub(io.grpc.Channel channel, 
io.grpc.CallOptions callOptions) {
+          return new AdminServiceBlockingStub(channel, callOptions);
+        }
+      };
+    return AdminServiceBlockingStub.newStub(factory, channel);
+  }
+
+  /**
+   * Creates a new ListenableFuture-style stub that supports unary calls on 
the service
+   */
+  public static AdminServiceFutureStub newFutureStub(
+      io.grpc.Channel channel) {
+    io.grpc.stub.AbstractStub.StubFactory<AdminServiceFutureStub> factory =
+      new io.grpc.stub.AbstractStub.StubFactory<AdminServiceFutureStub>() {
+        @Override
+        public AdminServiceFutureStub newStub(io.grpc.Channel channel, 
io.grpc.CallOptions callOptions) {
+          return new AdminServiceFutureStub(channel, callOptions);
+        }
+      };
+    return AdminServiceFutureStub.newStub(factory, channel);
+  }
+
+  /**
+   */
+  public static abstract class AdminServiceImplBase implements 
io.grpc.BindableService {
+
+    /**
+     */
+    public void invoke(Payload request,
+        io.grpc.stub.StreamObserver<Payload> responseObserver) {
+      io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall(getInvokeMethod(), 
responseObserver);
+    }
+
+    @Override public final io.grpc.ServerServiceDefinition bindService() {
+      return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
+          .addMethod(
+            getInvokeMethod(),
+            io.grpc.stub.ServerCalls.asyncUnaryCall(
+              new MethodHandlers<
+                Payload,
+                Payload>(
+                  this, METHODID_INVOKE)))
+          .build();
+    }
+  }
+
+  /**
+   */
+  public static final class AdminServiceStub extends 
io.grpc.stub.AbstractAsyncStub<AdminServiceStub> {
+    private AdminServiceStub(
+        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+      super(channel, callOptions);
+    }
+
+    @Override
+    protected AdminServiceStub build(
+        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+      return new AdminServiceStub(channel, callOptions);
+    }
+
+    /**
+     */
+    public void invoke(Payload request,
+        io.grpc.stub.StreamObserver<Payload> responseObserver) {
+      io.grpc.stub.ClientCalls.asyncUnaryCall(
+          getChannel().newCall(getInvokeMethod(), getCallOptions()), request, 
responseObserver);
+    }
+  }
+
+  /**
+   */
+  public static final class AdminServiceBlockingStub extends 
io.grpc.stub.AbstractBlockingStub<AdminServiceBlockingStub> {
+    private AdminServiceBlockingStub(
+        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+      super(channel, callOptions);
+    }
+
+    @Override
+    protected AdminServiceBlockingStub build(
+        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+      return new AdminServiceBlockingStub(channel, callOptions);
+    }
+
+    /**
+     */
+    public Payload invoke(Payload request) {
+      return io.grpc.stub.ClientCalls.blockingUnaryCall(
+          getChannel(), getInvokeMethod(), getCallOptions(), request);
+    }
+  }
+
+  /**
+   */
+  public static final class AdminServiceFutureStub extends 
io.grpc.stub.AbstractFutureStub<AdminServiceFutureStub> {
+    private AdminServiceFutureStub(
+        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+      super(channel, callOptions);
+    }
+
+    @Override
+    protected AdminServiceFutureStub build(
+        io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
+      return new AdminServiceFutureStub(channel, callOptions);
+    }
+
+    /**
+     */
+    public com.google.common.util.concurrent.ListenableFuture<Payload> invoke(
+        Payload request) {
+      return io.grpc.stub.ClientCalls.futureUnaryCall(
+          getChannel().newCall(getInvokeMethod(), getCallOptions()), request);
+    }
+  }
+
+  private static final int METHODID_INVOKE = 0;
+
+  private static final class MethodHandlers<Req, Resp> implements
+      io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
+      io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
+      io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
+      io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
+    private final AdminServiceImplBase serviceImpl;
+    private final int methodId;
+
+    MethodHandlers(AdminServiceImplBase serviceImpl, int methodId) {
+      this.serviceImpl = serviceImpl;
+      this.methodId = methodId;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> 
responseObserver) {
+      switch (methodId) {
+        case METHODID_INVOKE:
+          serviceImpl.invoke((Payload) request,
+              (io.grpc.stub.StreamObserver<Payload>) responseObserver);
+          break;
+        default:
+          throw new AssertionError();
+      }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public io.grpc.stub.StreamObserver<Req> invoke(
+        io.grpc.stub.StreamObserver<Resp> responseObserver) {
+      switch (methodId) {
+        default:
+          throw new AssertionError();
+      }
+    }
+  }
+
+  private static abstract class AdminServiceBaseDescriptorSupplier
+      implements io.grpc.protobuf.ProtoFileDescriptorSupplier, 
io.grpc.protobuf.ProtoServiceDescriptorSupplier {
+    AdminServiceBaseDescriptorSupplier() {}
+
+    @Override
+    public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
+      return EventMeshAdminService.getDescriptor();
+    }
+
+    @Override
+    public com.google.protobuf.Descriptors.ServiceDescriptor 
getServiceDescriptor() {
+      return getFileDescriptor().findServiceByName("AdminService");
+    }
+  }
+
+  private static final class AdminServiceFileDescriptorSupplier
+      extends AdminServiceBaseDescriptorSupplier {
+    AdminServiceFileDescriptorSupplier() {}
+  }
+
+  private static final class AdminServiceMethodDescriptorSupplier
+      extends AdminServiceBaseDescriptorSupplier
+      implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
+    private final String methodName;
+
+    AdminServiceMethodDescriptorSupplier(String methodName) {
+      this.methodName = methodName;
+    }
+
+    @Override
+    public com.google.protobuf.Descriptors.MethodDescriptor 
getMethodDescriptor() {
+      return getServiceDescriptor().findMethodByName(methodName);
+    }
+  }
+
+  private static volatile io.grpc.ServiceDescriptor serviceDescriptor;
+
+  public static io.grpc.ServiceDescriptor getServiceDescriptor() {
+    io.grpc.ServiceDescriptor result = serviceDescriptor;
+    if (result == null) {
+      synchronized (AdminServiceGrpc.class) {
+        result = serviceDescriptor;
+        if (result == null) {
+          serviceDescriptor = result = 
io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
+              .setSchemaDescriptor(new AdminServiceFileDescriptorSupplier())
+              .addMethod(getInvokeMethod())
+              .build();
+        }
+      }
+    }
+    return result;
+  }
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/EventMeshAdminService.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/EventMeshAdminService.java
new file mode 100644
index 000000000..7d4c3ee1b
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/EventMeshAdminService.java
@@ -0,0 +1,77 @@
+package org.apache.eventmesh.common.protocol.grpc.adminserver;// Generated by 
the protocol buffer compiler.  DO NOT EDIT!
+// source: event_mesh_admin_service.proto
+
+public final class EventMeshAdminService {
+  private EventMeshAdminService() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistryLite registry) {
+  }
+
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistry registry) {
+    registerAllExtensions(
+        (com.google.protobuf.ExtensionRegistryLite) registry);
+  }
+  static final com.google.protobuf.Descriptors.Descriptor
+    internal_static_Metadata_descriptor;
+  static final 
+    com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+      internal_static_Metadata_fieldAccessorTable;
+  static final com.google.protobuf.Descriptors.Descriptor
+    internal_static_Metadata_HeadersEntry_descriptor;
+  static final 
+    com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+      internal_static_Metadata_HeadersEntry_fieldAccessorTable;
+  static final com.google.protobuf.Descriptors.Descriptor
+    internal_static_Payload_descriptor;
+  static final 
+    com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+      internal_static_Payload_fieldAccessorTable;
+
+  public static com.google.protobuf.Descriptors.FileDescriptor
+      getDescriptor() {
+    return descriptor;
+  }
+  private static  com.google.protobuf.Descriptors.FileDescriptor
+      descriptor;
+  static {
+    String[] descriptorData = {
+      "\n\036event_mesh_admin_service.proto\032\031google" +
+      "/protobuf/any.proto\"q\n\010Metadata\022\014\n\004type\030" +
+      "\003 \001(\t\022\'\n\007headers\030\007 \003(\0132\026.Metadata.Header" 
+
+      "sEntry\032.\n\014HeadersEntry\022\013\n\003key\030\001 
\001(\t\022\r\n\005v" +
+      "alue\030\002 
\001(\t:\0028\001\"J\n\007Payload\022\033\n\010metadata\030\002 " +
+      "\001(\0132\t.Metadata\022\"\n\004body\030\003 \001(\0132\024.google.pr" 
+
+      "otobuf.Any2B\n\024AdminBiStreamService\022*\n\016in" +
+      "vokeBiStream\022\010.Payload\032\010.Payload\"\000(\0010\00124" +
+      "\n\014AdminService\022$\n\014invokeStream\022\010.Payload" +
+      "\032\010.Payload\"\000B\002P\001b\006proto3"
+    };
+    descriptor = com.google.protobuf.Descriptors.FileDescriptor
+      .internalBuildGeneratedFileFrom(descriptorData,
+        new com.google.protobuf.Descriptors.FileDescriptor[] {
+          com.google.protobuf.AnyProto.getDescriptor(),
+        });
+    internal_static_Metadata_descriptor =
+      getDescriptor().getMessageTypes().get(0);
+    internal_static_Metadata_fieldAccessorTable = new
+      com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
+        internal_static_Metadata_descriptor,
+        new String[] { "Type", "Headers", });
+    internal_static_Metadata_HeadersEntry_descriptor =
+      internal_static_Metadata_descriptor.getNestedTypes().get(0);
+    internal_static_Metadata_HeadersEntry_fieldAccessorTable = new
+      com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
+        internal_static_Metadata_HeadersEntry_descriptor,
+        new String[] { "Key", "Value", });
+    internal_static_Payload_descriptor =
+      getDescriptor().getMessageTypes().get(1);
+    internal_static_Payload_fieldAccessorTable = new
+      com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
+        internal_static_Payload_descriptor,
+        new String[] { "Metadata", "Body", });
+    com.google.protobuf.AnyProto.getDescriptor();
+  }
+
+  // @@protoc_insertion_point(outer_class_scope)
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/Metadata.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/Metadata.java
new file mode 100644
index 000000000..b7f11427b
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/Metadata.java
@@ -0,0 +1,841 @@
+package org.apache.eventmesh.common.protocol.grpc.adminserver;// Generated by 
the protocol buffer compiler.  DO NOT EDIT!
+// source: event_mesh_admin_service.proto
+
+/**
+ * Protobuf type {@code Metadata}
+ */
+public final class Metadata extends
+    com.google.protobuf.GeneratedMessageV3 implements
+    // @@protoc_insertion_point(message_implements:Metadata)
+        MetadataOrBuilder {
+private static final long serialVersionUID = 0L;
+  // Use Metadata.newBuilder() to construct.
+  private Metadata(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
+    super(builder);
+  }
+  private Metadata() {
+    type_ = "";
+  }
+
+  @Override
+  @SuppressWarnings({"unused"})
+  protected Object newInstance(
+      UnusedPrivateParameter unused) {
+    return new Metadata();
+  }
+
+  @Override
+  public final com.google.protobuf.UnknownFieldSet
+  getUnknownFields() {
+    return this.unknownFields;
+  }
+  private Metadata(
+      com.google.protobuf.CodedInputStream input,
+      com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+      throws com.google.protobuf.InvalidProtocolBufferException {
+    this();
+    if (extensionRegistry == null) {
+      throw new NullPointerException();
+    }
+    int mutable_bitField0_ = 0;
+    com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+        com.google.protobuf.UnknownFieldSet.newBuilder();
+    try {
+      boolean done = false;
+      while (!done) {
+        int tag = input.readTag();
+        switch (tag) {
+          case 0:
+            done = true;
+            break;
+          case 26: {
+            String s = input.readStringRequireUtf8();
+
+            type_ = s;
+            break;
+          }
+          case 58: {
+            if (!((mutable_bitField0_ & 0x00000001) != 0)) {
+              headers_ = com.google.protobuf.MapField.newMapField(
+                  HeadersDefaultEntryHolder.defaultEntry);
+              mutable_bitField0_ |= 0x00000001;
+            }
+            com.google.protobuf.MapEntry<String, String>
+            headers__ = input.readMessage(
+                HeadersDefaultEntryHolder.defaultEntry.getParserForType(), 
extensionRegistry);
+            headers_.getMutableMap().put(
+                headers__.getKey(), headers__.getValue());
+            break;
+          }
+          default: {
+            if (!parseUnknownField(
+                input, unknownFields, extensionRegistry, tag)) {
+              done = true;
+            }
+            break;
+          }
+        }
+      }
+    } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+      throw e.setUnfinishedMessage(this);
+    } catch (java.io.IOException e) {
+      throw new com.google.protobuf.InvalidProtocolBufferException(
+          e).setUnfinishedMessage(this);
+    } finally {
+      this.unknownFields = unknownFields.build();
+      makeExtensionsImmutable();
+    }
+  }
+  public static final com.google.protobuf.Descriptors.Descriptor
+      getDescriptor() {
+    return EventMeshAdminService.internal_static_Metadata_descriptor;
+  }
+
+  @SuppressWarnings({"rawtypes"})
+  @Override
+  protected com.google.protobuf.MapField internalGetMapField(
+      int number) {
+    switch (number) {
+      case 7:
+        return internalGetHeaders();
+      default:
+        throw new RuntimeException(
+            "Invalid map field number: " + number);
+    }
+  }
+  @Override
+  protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+      internalGetFieldAccessorTable() {
+    return EventMeshAdminService.internal_static_Metadata_fieldAccessorTable
+        .ensureFieldAccessorsInitialized(
+            Metadata.class, Builder.class);
+  }
+
+  public static final int TYPE_FIELD_NUMBER = 3;
+  private volatile Object type_;
+  /**
+   * <code>string type = 3;</code>
+   * @return The type.
+   */
+  @Override
+  public String getType() {
+    Object ref = type_;
+    if (ref instanceof String) {
+      return (String) ref;
+    } else {
+      com.google.protobuf.ByteString bs = 
+          (com.google.protobuf.ByteString) ref;
+      String s = bs.toStringUtf8();
+      type_ = s;
+      return s;
+    }
+  }
+  /**
+   * <code>string type = 3;</code>
+   * @return The bytes for type.
+   */
+  @Override
+  public com.google.protobuf.ByteString
+      getTypeBytes() {
+    Object ref = type_;
+    if (ref instanceof String) {
+      com.google.protobuf.ByteString b = 
+          com.google.protobuf.ByteString.copyFromUtf8(
+              (String) ref);
+      type_ = b;
+      return b;
+    } else {
+      return (com.google.protobuf.ByteString) ref;
+    }
+  }
+
+  public static final int HEADERS_FIELD_NUMBER = 7;
+  private static final class HeadersDefaultEntryHolder {
+    static final com.google.protobuf.MapEntry<
+        String, String> defaultEntry =
+            com.google.protobuf.MapEntry
+            .<String, String>newDefaultInstance(
+                
EventMeshAdminService.internal_static_Metadata_HeadersEntry_descriptor, 
+                com.google.protobuf.WireFormat.FieldType.STRING,
+                "",
+                com.google.protobuf.WireFormat.FieldType.STRING,
+                "");
+  }
+  private com.google.protobuf.MapField<
+      String, String> headers_;
+  private com.google.protobuf.MapField<String, String>
+  internalGetHeaders() {
+    if (headers_ == null) {
+      return com.google.protobuf.MapField.emptyMapField(
+          HeadersDefaultEntryHolder.defaultEntry);
+    }
+    return headers_;
+  }
+
+  public int getHeadersCount() {
+    return internalGetHeaders().getMap().size();
+  }
+  /**
+   * <code>map&lt;string, string&gt; headers = 7;</code>
+   */
+
+  @Override
+  public boolean containsHeaders(
+      String key) {
+    if (key == null) { throw new NullPointerException(); }
+    return internalGetHeaders().getMap().containsKey(key);
+  }
+  /**
+   * Use {@link #getHeadersMap()} instead.
+   */
+  @Override
+  @Deprecated
+  public java.util.Map<String, String> getHeaders() {
+    return getHeadersMap();
+  }
+  /**
+   * <code>map&lt;string, string&gt; headers = 7;</code>
+   */
+  @Override
+
+  public java.util.Map<String, String> getHeadersMap() {
+    return internalGetHeaders().getMap();
+  }
+  /**
+   * <code>map&lt;string, string&gt; headers = 7;</code>
+   */
+  @Override
+
+  public String getHeadersOrDefault(
+      String key,
+      String defaultValue) {
+    if (key == null) { throw new NullPointerException(); }
+    java.util.Map<String, String> map =
+        internalGetHeaders().getMap();
+    return map.containsKey(key) ? map.get(key) : defaultValue;
+  }
+  /**
+   * <code>map&lt;string, string&gt; headers = 7;</code>
+   */
+  @Override
+
+  public String getHeadersOrThrow(
+      String key) {
+    if (key == null) { throw new NullPointerException(); }
+    java.util.Map<String, String> map =
+        internalGetHeaders().getMap();
+    if (!map.containsKey(key)) {
+      throw new IllegalArgumentException();
+    }
+    return map.get(key);
+  }
+
+  private byte memoizedIsInitialized = -1;
+  @Override
+  public final boolean isInitialized() {
+    byte isInitialized = memoizedIsInitialized;
+    if (isInitialized == 1) return true;
+    if (isInitialized == 0) return false;
+
+    memoizedIsInitialized = 1;
+    return true;
+  }
+
+  @Override
+  public void writeTo(com.google.protobuf.CodedOutputStream output)
+                      throws java.io.IOException {
+    if (!getTypeBytes().isEmpty()) {
+      com.google.protobuf.GeneratedMessageV3.writeString(output, 3, type_);
+    }
+    com.google.protobuf.GeneratedMessageV3
+      .serializeStringMapTo(
+        output,
+        internalGetHeaders(),
+        HeadersDefaultEntryHolder.defaultEntry,
+        7);
+    unknownFields.writeTo(output);
+  }
+
+  @Override
+  public int getSerializedSize() {
+    int size = memoizedSize;
+    if (size != -1) return size;
+
+    size = 0;
+    if (!getTypeBytes().isEmpty()) {
+      size += com.google.protobuf.GeneratedMessageV3.computeStringSize(3, 
type_);
+    }
+    for (java.util.Map.Entry<String, String> entry
+         : internalGetHeaders().getMap().entrySet()) {
+      com.google.protobuf.MapEntry<String, String>
+      headers__ = HeadersDefaultEntryHolder.defaultEntry.newBuilderForType()
+          .setKey(entry.getKey())
+          .setValue(entry.getValue())
+          .build();
+      size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(7, headers__);
+    }
+    size += unknownFields.getSerializedSize();
+    memoizedSize = size;
+    return size;
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (obj == this) {
+     return true;
+    }
+    if (!(obj instanceof Metadata)) {
+      return super.equals(obj);
+    }
+    Metadata other = (Metadata) obj;
+
+    if (!getType()
+        .equals(other.getType())) return false;
+    if (!internalGetHeaders().equals(
+        other.internalGetHeaders())) return false;
+    if (!unknownFields.equals(other.unknownFields)) return false;
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    if (memoizedHashCode != 0) {
+      return memoizedHashCode;
+    }
+    int hash = 41;
+    hash = (19 * hash) + getDescriptor().hashCode();
+    hash = (37 * hash) + TYPE_FIELD_NUMBER;
+    hash = (53 * hash) + getType().hashCode();
+    if (!internalGetHeaders().getMap().isEmpty()) {
+      hash = (37 * hash) + HEADERS_FIELD_NUMBER;
+      hash = (53 * hash) + internalGetHeaders().hashCode();
+    }
+    hash = (29 * hash) + unknownFields.hashCode();
+    memoizedHashCode = hash;
+    return hash;
+  }
+
+  public static Metadata parseFrom(
+      java.nio.ByteBuffer data)
+      throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data);
+  }
+  public static Metadata parseFrom(
+      java.nio.ByteBuffer data,
+      com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+      throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data, extensionRegistry);
+  }
+  public static Metadata parseFrom(
+      com.google.protobuf.ByteString data)
+      throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data);
+  }
+  public static Metadata parseFrom(
+      com.google.protobuf.ByteString data,
+      com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+      throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data, extensionRegistry);
+  }
+  public static Metadata parseFrom(byte[] data)
+      throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data);
+  }
+  public static Metadata parseFrom(
+      byte[] data,
+      com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+      throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data, extensionRegistry);
+  }
+  public static Metadata parseFrom(java.io.InputStream input)
+      throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+        .parseWithIOException(PARSER, input);
+  }
+  public static Metadata parseFrom(
+      java.io.InputStream input,
+      com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+      throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+        .parseWithIOException(PARSER, input, extensionRegistry);
+  }
+  public static Metadata parseDelimitedFrom(java.io.InputStream input)
+      throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+        .parseDelimitedWithIOException(PARSER, input);
+  }
+  public static Metadata parseDelimitedFrom(
+      java.io.InputStream input,
+      com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+      throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+        .parseDelimitedWithIOException(PARSER, input, extensionRegistry);
+  }
+  public static Metadata parseFrom(
+      com.google.protobuf.CodedInputStream input)
+      throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+        .parseWithIOException(PARSER, input);
+  }
+  public static Metadata parseFrom(
+      com.google.protobuf.CodedInputStream input,
+      com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+      throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+        .parseWithIOException(PARSER, input, extensionRegistry);
+  }
+
+  @Override
+  public Builder newBuilderForType() { return newBuilder(); }
+  public static Builder newBuilder() {
+    return DEFAULT_INSTANCE.toBuilder();
+  }
+  public static Builder newBuilder(Metadata prototype) {
+    return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
+  }
+  @Override
+  public Builder toBuilder() {
+    return this == DEFAULT_INSTANCE
+        ? new Builder() : new Builder().mergeFrom(this);
+  }
+
+  @Override
+  protected Builder newBuilderForType(
+      com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
+    Builder builder = new Builder(parent);
+    return builder;
+  }
+  /**
+   * Protobuf type {@code Metadata}
+   */
+  public static final class Builder extends
+      com.google.protobuf.GeneratedMessageV3.Builder<Builder> implements
+      // @@protoc_insertion_point(builder_implements:Metadata)
+          MetadataOrBuilder {
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return EventMeshAdminService.internal_static_Metadata_descriptor;
+    }
+
+    @SuppressWarnings({"rawtypes"})
+    protected com.google.protobuf.MapField internalGetMapField(
+        int number) {
+      switch (number) {
+        case 7:
+          return internalGetHeaders();
+        default:
+          throw new RuntimeException(
+              "Invalid map field number: " + number);
+      }
+    }
+    @SuppressWarnings({"rawtypes"})
+    protected com.google.protobuf.MapField internalGetMutableMapField(
+        int number) {
+      switch (number) {
+        case 7:
+          return internalGetMutableHeaders();
+        default:
+          throw new RuntimeException(
+              "Invalid map field number: " + number);
+      }
+    }
+    @Override
+    protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return EventMeshAdminService.internal_static_Metadata_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              Metadata.class, Builder.class);
+    }
+
+    // Construct using Metadata.newBuilder()
+    private Builder() {
+      maybeForceBuilderInitialization();
+    }
+
+    private Builder(
+        com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
+      super(parent);
+      maybeForceBuilderInitialization();
+    }
+    private void maybeForceBuilderInitialization() {
+      if (com.google.protobuf.GeneratedMessageV3
+              .alwaysUseFieldBuilders) {
+      }
+    }
+    @Override
+    public Builder clear() {
+      super.clear();
+      type_ = "";
+
+      internalGetMutableHeaders().clear();
+      return this;
+    }
+
+    @Override
+    public com.google.protobuf.Descriptors.Descriptor
+        getDescriptorForType() {
+      return EventMeshAdminService.internal_static_Metadata_descriptor;
+    }
+
+    @Override
+    public Metadata getDefaultInstanceForType() {
+      return Metadata.getDefaultInstance();
+    }
+
+    @Override
+    public Metadata build() {
+      Metadata result = buildPartial();
+      if (!result.isInitialized()) {
+        throw newUninitializedMessageException(result);
+      }
+      return result;
+    }
+
+    @Override
+    public Metadata buildPartial() {
+      Metadata result = new Metadata(this);
+      int from_bitField0_ = bitField0_;
+      result.type_ = type_;
+      result.headers_ = internalGetHeaders();
+      result.headers_.makeImmutable();
+      onBuilt();
+      return result;
+    }
+
+    @Override
+    public Builder clone() {
+      return super.clone();
+    }
+    @Override
+    public Builder setField(
+        com.google.protobuf.Descriptors.FieldDescriptor field,
+        Object value) {
+      return super.setField(field, value);
+    }
+    @Override
+    public Builder clearField(
+        com.google.protobuf.Descriptors.FieldDescriptor field) {
+      return super.clearField(field);
+    }
+    @Override
+    public Builder clearOneof(
+        com.google.protobuf.Descriptors.OneofDescriptor oneof) {
+      return super.clearOneof(oneof);
+    }
+    @Override
+    public Builder setRepeatedField(
+        com.google.protobuf.Descriptors.FieldDescriptor field,
+        int index, Object value) {
+      return super.setRepeatedField(field, index, value);
+    }
+    @Override
+    public Builder addRepeatedField(
+        com.google.protobuf.Descriptors.FieldDescriptor field,
+        Object value) {
+      return super.addRepeatedField(field, value);
+    }
+    @Override
+    public Builder mergeFrom(com.google.protobuf.Message other) {
+      if (other instanceof Metadata) {
+        return mergeFrom((Metadata)other);
+      } else {
+        super.mergeFrom(other);
+        return this;
+      }
+    }
+
+    public Builder mergeFrom(Metadata other) {
+      if (other == Metadata.getDefaultInstance()) return this;
+      if (!other.getType().isEmpty()) {
+        type_ = other.type_;
+        onChanged();
+      }
+      internalGetMutableHeaders().mergeFrom(
+          other.internalGetHeaders());
+      this.mergeUnknownFields(other.unknownFields);
+      onChanged();
+      return this;
+    }
+
+    @Override
+    public final boolean isInitialized() {
+      return true;
+    }
+
+    @Override
+    public Builder mergeFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Metadata parsedMessage = null;
+      try {
+        parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        parsedMessage = (Metadata) e.getUnfinishedMessage();
+        throw e.unwrapIOException();
+      } finally {
+        if (parsedMessage != null) {
+          mergeFrom(parsedMessage);
+        }
+      }
+      return this;
+    }
+    private int bitField0_;
+
+    private Object type_ = "";
+    /**
+     * <code>string type = 3;</code>
+     * @return The type.
+     */
+    public String getType() {
+      Object ref = type_;
+      if (!(ref instanceof String)) {
+        com.google.protobuf.ByteString bs =
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        type_ = s;
+        return s;
+      } else {
+        return (String) ref;
+      }
+    }
+    /**
+     * <code>string type = 3;</code>
+     * @return The bytes for type.
+     */
+    public com.google.protobuf.ByteString
+        getTypeBytes() {
+      Object ref = type_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (String) ref);
+        type_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    /**
+     * <code>string type = 3;</code>
+     * @param value The type to set.
+     * @return This builder for chaining.
+     */
+    public Builder setType(
+        String value) {
+      if (value == null) {
+    throw new NullPointerException();
+  }
+  
+      type_ = value;
+      onChanged();
+      return this;
+    }
+    /**
+     * <code>string type = 3;</code>
+     * @return This builder for chaining.
+     */
+    public Builder clearType() {
+      
+      type_ = getDefaultInstance().getType();
+      onChanged();
+      return this;
+    }
+    /**
+     * <code>string type = 3;</code>
+     * @param value The bytes for type to set.
+     * @return This builder for chaining.
+     */
+    public Builder setTypeBytes(
+        com.google.protobuf.ByteString value) {
+      if (value == null) {
+    throw new NullPointerException();
+  }
+  checkByteStringIsUtf8(value);
+      
+      type_ = value;
+      onChanged();
+      return this;
+    }
+
+    private com.google.protobuf.MapField<
+        String, String> headers_;
+    private com.google.protobuf.MapField<String, String>
+    internalGetHeaders() {
+      if (headers_ == null) {
+        return com.google.protobuf.MapField.emptyMapField(
+            HeadersDefaultEntryHolder.defaultEntry);
+      }
+      return headers_;
+    }
+    private com.google.protobuf.MapField<String, String>
+    internalGetMutableHeaders() {
+      onChanged();;
+      if (headers_ == null) {
+        headers_ = com.google.protobuf.MapField.newMapField(
+            HeadersDefaultEntryHolder.defaultEntry);
+      }
+      if (!headers_.isMutable()) {
+        headers_ = headers_.copy();
+      }
+      return headers_;
+    }
+
+    public int getHeadersCount() {
+      return internalGetHeaders().getMap().size();
+    }
+    /**
+     * <code>map&lt;string, string&gt; headers = 7;</code>
+     */
+
+    @Override
+    public boolean containsHeaders(
+        String key) {
+      if (key == null) { throw new NullPointerException(); }
+      return internalGetHeaders().getMap().containsKey(key);
+    }
+    /**
+     * Use {@link #getHeadersMap()} instead.
+     */
+    @Override
+    @Deprecated
+    public java.util.Map<String, String> getHeaders() {
+      return getHeadersMap();
+    }
+    /**
+     * <code>map&lt;string, string&gt; headers = 7;</code>
+     */
+    @Override
+
+    public java.util.Map<String, String> getHeadersMap() {
+      return internalGetHeaders().getMap();
+    }
+    /**
+     * <code>map&lt;string, string&gt; headers = 7;</code>
+     */
+    @Override
+
+    public String getHeadersOrDefault(
+        String key,
+        String defaultValue) {
+      if (key == null) { throw new NullPointerException(); }
+      java.util.Map<String, String> map =
+          internalGetHeaders().getMap();
+      return map.containsKey(key) ? map.get(key) : defaultValue;
+    }
+    /**
+     * <code>map&lt;string, string&gt; headers = 7;</code>
+     */
+    @Override
+
+    public String getHeadersOrThrow(
+        String key) {
+      if (key == null) { throw new NullPointerException(); }
+      java.util.Map<String, String> map =
+          internalGetHeaders().getMap();
+      if (!map.containsKey(key)) {
+        throw new IllegalArgumentException();
+      }
+      return map.get(key);
+    }
+
+    public Builder clearHeaders() {
+      internalGetMutableHeaders().getMutableMap()
+          .clear();
+      return this;
+    }
+    /**
+     * <code>map&lt;string, string&gt; headers = 7;</code>
+     */
+
+    public Builder removeHeaders(
+        String key) {
+      if (key == null) { throw new NullPointerException(); }
+      internalGetMutableHeaders().getMutableMap()
+          .remove(key);
+      return this;
+    }
+    /**
+     * Use alternate mutation accessors instead.
+     */
+    @Deprecated
+    public java.util.Map<String, String>
+    getMutableHeaders() {
+      return internalGetMutableHeaders().getMutableMap();
+    }
+    /**
+     * <code>map&lt;string, string&gt; headers = 7;</code>
+     */
+    public Builder putHeaders(
+        String key,
+        String value) {
+      if (key == null) { throw new NullPointerException(); }
+      if (value == null) { throw new NullPointerException(); }
+      internalGetMutableHeaders().getMutableMap()
+          .put(key, value);
+      return this;
+    }
+    /**
+     * <code>map&lt;string, string&gt; headers = 7;</code>
+     */
+
+    public Builder putAllHeaders(
+        java.util.Map<String, String> values) {
+      internalGetMutableHeaders().getMutableMap()
+          .putAll(values);
+      return this;
+    }
+    @Override
+    public final Builder setUnknownFields(
+        final com.google.protobuf.UnknownFieldSet unknownFields) {
+      return super.setUnknownFields(unknownFields);
+    }
+
+    @Override
+    public final Builder mergeUnknownFields(
+        final com.google.protobuf.UnknownFieldSet unknownFields) {
+      return super.mergeUnknownFields(unknownFields);
+    }
+
+
+    // @@protoc_insertion_point(builder_scope:Metadata)
+  }
+
+  // @@protoc_insertion_point(class_scope:Metadata)
+  private static final Metadata DEFAULT_INSTANCE;
+  static {
+    DEFAULT_INSTANCE = new Metadata();
+  }
+
+  public static Metadata getDefaultInstance() {
+    return DEFAULT_INSTANCE;
+  }
+
+  private static final com.google.protobuf.Parser<Metadata>
+      PARSER = new com.google.protobuf.AbstractParser<Metadata>() {
+    @Override
+    public Metadata parsePartialFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return new Metadata(input, extensionRegistry);
+    }
+  };
+
+  public static com.google.protobuf.Parser<Metadata> parser() {
+    return PARSER;
+  }
+
+  @Override
+  public com.google.protobuf.Parser<Metadata> getParserForType() {
+    return PARSER;
+  }
+
+  @Override
+  public Metadata getDefaultInstanceForType() {
+    return DEFAULT_INSTANCE;
+  }
+
+}
+
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/MetadataOrBuilder.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/MetadataOrBuilder.java
new file mode 100644
index 000000000..3af900a5c
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/MetadataOrBuilder.java
@@ -0,0 +1,53 @@
+package org.apache.eventmesh.common.protocol.grpc.adminserver;// Generated by 
the protocol buffer compiler.  DO NOT EDIT!
+// source: event_mesh_admin_service.proto
+
+public interface MetadataOrBuilder extends
+    // @@protoc_insertion_point(interface_extends:Metadata)
+    com.google.protobuf.MessageOrBuilder {
+
+  /**
+   * <code>string type = 3;</code>
+   * @return The type.
+   */
+  String getType();
+  /**
+   * <code>string type = 3;</code>
+   * @return The bytes for type.
+   */
+  com.google.protobuf.ByteString
+      getTypeBytes();
+
+  /**
+   * <code>map&lt;string, string&gt; headers = 7;</code>
+   */
+  int getHeadersCount();
+  /**
+   * <code>map&lt;string, string&gt; headers = 7;</code>
+   */
+  boolean containsHeaders(
+      String key);
+  /**
+   * Use {@link #getHeadersMap()} instead.
+   */
+  @Deprecated
+  java.util.Map<String, String>
+  getHeaders();
+  /**
+   * <code>map&lt;string, string&gt; headers = 7;</code>
+   */
+  java.util.Map<String, String>
+  getHeadersMap();
+  /**
+   * <code>map&lt;string, string&gt; headers = 7;</code>
+   */
+
+  String getHeadersOrDefault(
+      String key,
+      String defaultValue);
+  /**
+   * <code>map&lt;string, string&gt; headers = 7;</code>
+   */
+
+  String getHeadersOrThrow(
+      String key);
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/Payload.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/Payload.java
new file mode 100644
index 000000000..d16b491ef
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/Payload.java
@@ -0,0 +1,793 @@
+package org.apache.eventmesh.common.protocol.grpc.adminserver;// Generated by 
the protocol buffer compiler.  DO NOT EDIT!
+// source: event_mesh_admin_service.proto
+
+/**
+ * Protobuf type {@code Payload}
+ */
+public final class Payload extends
+    com.google.protobuf.GeneratedMessageV3 implements
+    // @@protoc_insertion_point(message_implements:Payload)
+    PayloadOrBuilder {
+private static final long serialVersionUID = 0L;
+  // Use Payload.newBuilder() to construct.
+  private Payload(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) {
+    super(builder);
+  }
+  private Payload() {
+  }
+
+  @Override
+  @SuppressWarnings({"unused"})
+  protected Object newInstance(
+      UnusedPrivateParameter unused) {
+    return new Payload();
+  }
+
+  @Override
+  public final com.google.protobuf.UnknownFieldSet
+  getUnknownFields() {
+    return this.unknownFields;
+  }
+  private Payload(
+      com.google.protobuf.CodedInputStream input,
+      com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+      throws com.google.protobuf.InvalidProtocolBufferException {
+    this();
+    if (extensionRegistry == null) {
+      throw new NullPointerException();
+    }
+    com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+        com.google.protobuf.UnknownFieldSet.newBuilder();
+    try {
+      boolean done = false;
+      while (!done) {
+        int tag = input.readTag();
+        switch (tag) {
+          case 0:
+            done = true;
+            break;
+          case 18: {
+            Metadata.Builder subBuilder = null;
+            if (metadata_ != null) {
+              subBuilder = metadata_.toBuilder();
+            }
+            metadata_ = input.readMessage(Metadata.parser(), 
extensionRegistry);
+            if (subBuilder != null) {
+              subBuilder.mergeFrom(metadata_);
+              metadata_ = subBuilder.buildPartial();
+            }
+
+            break;
+          }
+          case 26: {
+            com.google.protobuf.Any.Builder subBuilder = null;
+            if (body_ != null) {
+              subBuilder = body_.toBuilder();
+            }
+            body_ = input.readMessage(com.google.protobuf.Any.parser(), 
extensionRegistry);
+            if (subBuilder != null) {
+              subBuilder.mergeFrom(body_);
+              body_ = subBuilder.buildPartial();
+            }
+
+            break;
+          }
+          default: {
+            if (!parseUnknownField(
+                input, unknownFields, extensionRegistry, tag)) {
+              done = true;
+            }
+            break;
+          }
+        }
+      }
+    } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+      throw e.setUnfinishedMessage(this);
+    } catch (java.io.IOException e) {
+      throw new com.google.protobuf.InvalidProtocolBufferException(
+          e).setUnfinishedMessage(this);
+    } finally {
+      this.unknownFields = unknownFields.build();
+      makeExtensionsImmutable();
+    }
+  }
+  public static final com.google.protobuf.Descriptors.Descriptor
+      getDescriptor() {
+    return EventMeshAdminService.internal_static_Payload_descriptor;
+  }
+
+  @Override
+  protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+      internalGetFieldAccessorTable() {
+    return EventMeshAdminService.internal_static_Payload_fieldAccessorTable
+        .ensureFieldAccessorsInitialized(
+            Payload.class, Builder.class);
+  }
+
+  public static final int METADATA_FIELD_NUMBER = 2;
+  private Metadata metadata_;
+  /**
+   * <code>.Metadata metadata = 2;</code>
+   * @return Whether the metadata field is set.
+   */
+  @Override
+  public boolean hasMetadata() {
+    return metadata_ != null;
+  }
+  /**
+   * <code>.Metadata metadata = 2;</code>
+   * @return The metadata.
+   */
+  @Override
+  public Metadata getMetadata() {
+    return metadata_ == null ? Metadata.getDefaultInstance() : metadata_;
+  }
+  /**
+   * <code>.Metadata metadata = 2;</code>
+   */
+  @Override
+  public MetadataOrBuilder getMetadataOrBuilder() {
+    return getMetadata();
+  }
+
+  public static final int BODY_FIELD_NUMBER = 3;
+  private com.google.protobuf.Any body_;
+  /**
+   * <code>.google.protobuf.Any body = 3;</code>
+   * @return Whether the body field is set.
+   */
+  @Override
+  public boolean hasBody() {
+    return body_ != null;
+  }
+  /**
+   * <code>.google.protobuf.Any body = 3;</code>
+   * @return The body.
+   */
+  @Override
+  public com.google.protobuf.Any getBody() {
+    return body_ == null ? com.google.protobuf.Any.getDefaultInstance() : 
body_;
+  }
+  /**
+   * <code>.google.protobuf.Any body = 3;</code>
+   */
+  @Override
+  public com.google.protobuf.AnyOrBuilder getBodyOrBuilder() {
+    return getBody();
+  }
+
+  private byte memoizedIsInitialized = -1;
+  @Override
+  public final boolean isInitialized() {
+    byte isInitialized = memoizedIsInitialized;
+    if (isInitialized == 1) return true;
+    if (isInitialized == 0) return false;
+
+    memoizedIsInitialized = 1;
+    return true;
+  }
+
+  @Override
+  public void writeTo(com.google.protobuf.CodedOutputStream output)
+                      throws java.io.IOException {
+    if (metadata_ != null) {
+      output.writeMessage(2, getMetadata());
+    }
+    if (body_ != null) {
+      output.writeMessage(3, getBody());
+    }
+    unknownFields.writeTo(output);
+  }
+
+  @Override
+  public int getSerializedSize() {
+    int size = memoizedSize;
+    if (size != -1) return size;
+
+    size = 0;
+    if (metadata_ != null) {
+      size += com.google.protobuf.CodedOutputStream
+        .computeMessageSize(2, getMetadata());
+    }
+    if (body_ != null) {
+      size += com.google.protobuf.CodedOutputStream
+        .computeMessageSize(3, getBody());
+    }
+    size += unknownFields.getSerializedSize();
+    memoizedSize = size;
+    return size;
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (obj == this) {
+     return true;
+    }
+    if (!(obj instanceof Payload)) {
+      return super.equals(obj);
+    }
+    Payload other = (Payload) obj;
+
+    if (hasMetadata() != other.hasMetadata()) return false;
+    if (hasMetadata()) {
+      if (!getMetadata()
+          .equals(other.getMetadata())) return false;
+    }
+    if (hasBody() != other.hasBody()) return false;
+    if (hasBody()) {
+      if (!getBody()
+          .equals(other.getBody())) return false;
+    }
+    if (!unknownFields.equals(other.unknownFields)) return false;
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    if (memoizedHashCode != 0) {
+      return memoizedHashCode;
+    }
+    int hash = 41;
+    hash = (19 * hash) + getDescriptor().hashCode();
+    if (hasMetadata()) {
+      hash = (37 * hash) + METADATA_FIELD_NUMBER;
+      hash = (53 * hash) + getMetadata().hashCode();
+    }
+    if (hasBody()) {
+      hash = (37 * hash) + BODY_FIELD_NUMBER;
+      hash = (53 * hash) + getBody().hashCode();
+    }
+    hash = (29 * hash) + unknownFields.hashCode();
+    memoizedHashCode = hash;
+    return hash;
+  }
+
+  public static Payload parseFrom(
+      java.nio.ByteBuffer data)
+      throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data);
+  }
+  public static Payload parseFrom(
+      java.nio.ByteBuffer data,
+      com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+      throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data, extensionRegistry);
+  }
+  public static Payload parseFrom(
+      com.google.protobuf.ByteString data)
+      throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data);
+  }
+  public static Payload parseFrom(
+      com.google.protobuf.ByteString data,
+      com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+      throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data, extensionRegistry);
+  }
+  public static Payload parseFrom(byte[] data)
+      throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data);
+  }
+  public static Payload parseFrom(
+      byte[] data,
+      com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+      throws com.google.protobuf.InvalidProtocolBufferException {
+    return PARSER.parseFrom(data, extensionRegistry);
+  }
+  public static Payload parseFrom(java.io.InputStream input)
+      throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+        .parseWithIOException(PARSER, input);
+  }
+  public static Payload parseFrom(
+      java.io.InputStream input,
+      com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+      throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+        .parseWithIOException(PARSER, input, extensionRegistry);
+  }
+  public static Payload parseDelimitedFrom(java.io.InputStream input)
+      throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+        .parseDelimitedWithIOException(PARSER, input);
+  }
+  public static Payload parseDelimitedFrom(
+      java.io.InputStream input,
+      com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+      throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+        .parseDelimitedWithIOException(PARSER, input, extensionRegistry);
+  }
+  public static Payload parseFrom(
+      com.google.protobuf.CodedInputStream input)
+      throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+        .parseWithIOException(PARSER, input);
+  }
+  public static Payload parseFrom(
+      com.google.protobuf.CodedInputStream input,
+      com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+      throws java.io.IOException {
+    return com.google.protobuf.GeneratedMessageV3
+        .parseWithIOException(PARSER, input, extensionRegistry);
+  }
+
+  @Override
+  public Builder newBuilderForType() { return newBuilder(); }
+  public static Builder newBuilder() {
+    return DEFAULT_INSTANCE.toBuilder();
+  }
+  public static Builder newBuilder(Payload prototype) {
+    return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
+  }
+  @Override
+  public Builder toBuilder() {
+    return this == DEFAULT_INSTANCE
+        ? new Builder() : new Builder().mergeFrom(this);
+  }
+
+  @Override
+  protected Builder newBuilderForType(
+      com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
+    Builder builder = new Builder(parent);
+    return builder;
+  }
+  /**
+   * Protobuf type {@code Payload}
+   */
+  public static final class Builder extends
+      com.google.protobuf.GeneratedMessageV3.Builder<Builder> implements
+      // @@protoc_insertion_point(builder_implements:Payload)
+      PayloadOrBuilder {
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return EventMeshAdminService.internal_static_Payload_descriptor;
+    }
+
+    @Override
+    protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return EventMeshAdminService.internal_static_Payload_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              Payload.class, Builder.class);
+    }
+
+    // Construct using Payload.newBuilder()
+    private Builder() {
+      maybeForceBuilderInitialization();
+    }
+
+    private Builder(
+        com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
+      super(parent);
+      maybeForceBuilderInitialization();
+    }
+    private void maybeForceBuilderInitialization() {
+      if (com.google.protobuf.GeneratedMessageV3
+              .alwaysUseFieldBuilders) {
+      }
+    }
+    @Override
+    public Builder clear() {
+      super.clear();
+      if (metadataBuilder_ == null) {
+        metadata_ = null;
+      } else {
+        metadata_ = null;
+        metadataBuilder_ = null;
+      }
+      if (bodyBuilder_ == null) {
+        body_ = null;
+      } else {
+        body_ = null;
+        bodyBuilder_ = null;
+      }
+      return this;
+    }
+
+    @Override
+    public com.google.protobuf.Descriptors.Descriptor
+        getDescriptorForType() {
+      return EventMeshAdminService.internal_static_Payload_descriptor;
+    }
+
+    @Override
+    public Payload getDefaultInstanceForType() {
+      return Payload.getDefaultInstance();
+    }
+
+    @Override
+    public Payload build() {
+      Payload result = buildPartial();
+      if (!result.isInitialized()) {
+        throw newUninitializedMessageException(result);
+      }
+      return result;
+    }
+
+    @Override
+    public Payload buildPartial() {
+      Payload result = new Payload(this);
+      if (metadataBuilder_ == null) {
+        result.metadata_ = metadata_;
+      } else {
+        result.metadata_ = metadataBuilder_.build();
+      }
+      if (bodyBuilder_ == null) {
+        result.body_ = body_;
+      } else {
+        result.body_ = bodyBuilder_.build();
+      }
+      onBuilt();
+      return result;
+    }
+
+    @Override
+    public Builder clone() {
+      return super.clone();
+    }
+    @Override
+    public Builder setField(
+        com.google.protobuf.Descriptors.FieldDescriptor field,
+        Object value) {
+      return super.setField(field, value);
+    }
+    @Override
+    public Builder clearField(
+        com.google.protobuf.Descriptors.FieldDescriptor field) {
+      return super.clearField(field);
+    }
+    @Override
+    public Builder clearOneof(
+        com.google.protobuf.Descriptors.OneofDescriptor oneof) {
+      return super.clearOneof(oneof);
+    }
+    @Override
+    public Builder setRepeatedField(
+        com.google.protobuf.Descriptors.FieldDescriptor field,
+        int index, Object value) {
+      return super.setRepeatedField(field, index, value);
+    }
+    @Override
+    public Builder addRepeatedField(
+        com.google.protobuf.Descriptors.FieldDescriptor field,
+        Object value) {
+      return super.addRepeatedField(field, value);
+    }
+    @Override
+    public Builder mergeFrom(com.google.protobuf.Message other) {
+      if (other instanceof Payload) {
+        return mergeFrom((Payload)other);
+      } else {
+        super.mergeFrom(other);
+        return this;
+      }
+    }
+
+    public Builder mergeFrom(Payload other) {
+      if (other == Payload.getDefaultInstance()) return this;
+      if (other.hasMetadata()) {
+        mergeMetadata(other.getMetadata());
+      }
+      if (other.hasBody()) {
+        mergeBody(other.getBody());
+      }
+      this.mergeUnknownFields(other.unknownFields);
+      onChanged();
+      return this;
+    }
+
+    @Override
+    public final boolean isInitialized() {
+      return true;
+    }
+
+    @Override
+    public Builder mergeFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Payload parsedMessage = null;
+      try {
+        parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        parsedMessage = (Payload) e.getUnfinishedMessage();
+        throw e.unwrapIOException();
+      } finally {
+        if (parsedMessage != null) {
+          mergeFrom(parsedMessage);
+        }
+      }
+      return this;
+    }
+
+    private Metadata metadata_;
+    private com.google.protobuf.SingleFieldBuilderV3<
+        Metadata, Metadata.Builder, MetadataOrBuilder> metadataBuilder_;
+    /**
+     * <code>.Metadata metadata = 2;</code>
+     * @return Whether the metadata field is set.
+     */
+    public boolean hasMetadata() {
+      return metadataBuilder_ != null || metadata_ != null;
+    }
+    /**
+     * <code>.Metadata metadata = 2;</code>
+     * @return The metadata.
+     */
+    public Metadata getMetadata() {
+      if (metadataBuilder_ == null) {
+        return metadata_ == null ? Metadata.getDefaultInstance() : metadata_;
+      } else {
+        return metadataBuilder_.getMessage();
+      }
+    }
+    /**
+     * <code>.Metadata metadata = 2;</code>
+     */
+    public Builder setMetadata(Metadata value) {
+      if (metadataBuilder_ == null) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        metadata_ = value;
+        onChanged();
+      } else {
+        metadataBuilder_.setMessage(value);
+      }
+
+      return this;
+    }
+    /**
+     * <code>.Metadata metadata = 2;</code>
+     */
+    public Builder setMetadata(
+        Metadata.Builder builderForValue) {
+      if (metadataBuilder_ == null) {
+        metadata_ = builderForValue.build();
+        onChanged();
+      } else {
+        metadataBuilder_.setMessage(builderForValue.build());
+      }
+
+      return this;
+    }
+    /**
+     * <code>.Metadata metadata = 2;</code>
+     */
+    public Builder mergeMetadata(Metadata value) {
+      if (metadataBuilder_ == null) {
+        if (metadata_ != null) {
+          metadata_ =
+            Metadata.newBuilder(metadata_).mergeFrom(value).buildPartial();
+        } else {
+          metadata_ = value;
+        }
+        onChanged();
+      } else {
+        metadataBuilder_.mergeFrom(value);
+      }
+
+      return this;
+    }
+    /**
+     * <code>.Metadata metadata = 2;</code>
+     */
+    public Builder clearMetadata() {
+      if (metadataBuilder_ == null) {
+        metadata_ = null;
+        onChanged();
+      } else {
+        metadata_ = null;
+        metadataBuilder_ = null;
+      }
+
+      return this;
+    }
+    /**
+     * <code>.Metadata metadata = 2;</code>
+     */
+    public Metadata.Builder getMetadataBuilder() {
+      
+      onChanged();
+      return getMetadataFieldBuilder().getBuilder();
+    }
+    /**
+     * <code>.Metadata metadata = 2;</code>
+     */
+    public MetadataOrBuilder getMetadataOrBuilder() {
+      if (metadataBuilder_ != null) {
+        return metadataBuilder_.getMessageOrBuilder();
+      } else {
+        return metadata_ == null ?
+            Metadata.getDefaultInstance() : metadata_;
+      }
+    }
+    /**
+     * <code>.Metadata metadata = 2;</code>
+     */
+    private com.google.protobuf.SingleFieldBuilderV3<
+        Metadata, Metadata.Builder, MetadataOrBuilder> 
+        getMetadataFieldBuilder() {
+      if (metadataBuilder_ == null) {
+        metadataBuilder_ = new com.google.protobuf.SingleFieldBuilderV3<
+            Metadata, Metadata.Builder, MetadataOrBuilder>(
+                getMetadata(),
+                getParentForChildren(),
+                isClean());
+        metadata_ = null;
+      }
+      return metadataBuilder_;
+    }
+
+    private com.google.protobuf.Any body_;
+    private com.google.protobuf.SingleFieldBuilderV3<
+        com.google.protobuf.Any, com.google.protobuf.Any.Builder, 
com.google.protobuf.AnyOrBuilder> bodyBuilder_;
+    /**
+     * <code>.google.protobuf.Any body = 3;</code>
+     * @return Whether the body field is set.
+     */
+    public boolean hasBody() {
+      return bodyBuilder_ != null || body_ != null;
+    }
+    /**
+     * <code>.google.protobuf.Any body = 3;</code>
+     * @return The body.
+     */
+    public com.google.protobuf.Any getBody() {
+      if (bodyBuilder_ == null) {
+        return body_ == null ? com.google.protobuf.Any.getDefaultInstance() : 
body_;
+      } else {
+        return bodyBuilder_.getMessage();
+      }
+    }
+    /**
+     * <code>.google.protobuf.Any body = 3;</code>
+     */
+    public Builder setBody(com.google.protobuf.Any value) {
+      if (bodyBuilder_ == null) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        body_ = value;
+        onChanged();
+      } else {
+        bodyBuilder_.setMessage(value);
+      }
+
+      return this;
+    }
+    /**
+     * <code>.google.protobuf.Any body = 3;</code>
+     */
+    public Builder setBody(
+        com.google.protobuf.Any.Builder builderForValue) {
+      if (bodyBuilder_ == null) {
+        body_ = builderForValue.build();
+        onChanged();
+      } else {
+        bodyBuilder_.setMessage(builderForValue.build());
+      }
+
+      return this;
+    }
+    /**
+     * <code>.google.protobuf.Any body = 3;</code>
+     */
+    public Builder mergeBody(com.google.protobuf.Any value) {
+      if (bodyBuilder_ == null) {
+        if (body_ != null) {
+          body_ =
+            
com.google.protobuf.Any.newBuilder(body_).mergeFrom(value).buildPartial();
+        } else {
+          body_ = value;
+        }
+        onChanged();
+      } else {
+        bodyBuilder_.mergeFrom(value);
+      }
+
+      return this;
+    }
+    /**
+     * <code>.google.protobuf.Any body = 3;</code>
+     */
+    public Builder clearBody() {
+      if (bodyBuilder_ == null) {
+        body_ = null;
+        onChanged();
+      } else {
+        body_ = null;
+        bodyBuilder_ = null;
+      }
+
+      return this;
+    }
+    /**
+     * <code>.google.protobuf.Any body = 3;</code>
+     */
+    public com.google.protobuf.Any.Builder getBodyBuilder() {
+      
+      onChanged();
+      return getBodyFieldBuilder().getBuilder();
+    }
+    /**
+     * <code>.google.protobuf.Any body = 3;</code>
+     */
+    public com.google.protobuf.AnyOrBuilder getBodyOrBuilder() {
+      if (bodyBuilder_ != null) {
+        return bodyBuilder_.getMessageOrBuilder();
+      } else {
+        return body_ == null ?
+            com.google.protobuf.Any.getDefaultInstance() : body_;
+      }
+    }
+    /**
+     * <code>.google.protobuf.Any body = 3;</code>
+     */
+    private com.google.protobuf.SingleFieldBuilderV3<
+        com.google.protobuf.Any, com.google.protobuf.Any.Builder, 
com.google.protobuf.AnyOrBuilder> 
+        getBodyFieldBuilder() {
+      if (bodyBuilder_ == null) {
+        bodyBuilder_ = new com.google.protobuf.SingleFieldBuilderV3<
+            com.google.protobuf.Any, com.google.protobuf.Any.Builder, 
com.google.protobuf.AnyOrBuilder>(
+                getBody(),
+                getParentForChildren(),
+                isClean());
+        body_ = null;
+      }
+      return bodyBuilder_;
+    }
+    @Override
+    public final Builder setUnknownFields(
+        final com.google.protobuf.UnknownFieldSet unknownFields) {
+      return super.setUnknownFields(unknownFields);
+    }
+
+    @Override
+    public final Builder mergeUnknownFields(
+        final com.google.protobuf.UnknownFieldSet unknownFields) {
+      return super.mergeUnknownFields(unknownFields);
+    }
+
+
+    // @@protoc_insertion_point(builder_scope:Payload)
+  }
+
+  // @@protoc_insertion_point(class_scope:Payload)
+  private static final Payload DEFAULT_INSTANCE;
+  static {
+    DEFAULT_INSTANCE = new Payload();
+  }
+
+  public static Payload getDefaultInstance() {
+    return DEFAULT_INSTANCE;
+  }
+
+  private static final com.google.protobuf.Parser<Payload>
+      PARSER = new com.google.protobuf.AbstractParser<Payload>() {
+    @Override
+    public Payload parsePartialFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return new Payload(input, extensionRegistry);
+    }
+  };
+
+  public static com.google.protobuf.Parser<Payload> parser() {
+    return PARSER;
+  }
+
+  @Override
+  public com.google.protobuf.Parser<Payload> getParserForType() {
+    return PARSER;
+  }
+
+  @Override
+  public Payload getDefaultInstanceForType() {
+    return DEFAULT_INSTANCE;
+  }
+
+}
+
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/PayloadOrBuilder.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/PayloadOrBuilder.java
new file mode 100644
index 000000000..ab9ad0091
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/grpc/adminserver/PayloadOrBuilder.java
@@ -0,0 +1,37 @@
+package org.apache.eventmesh.common.protocol.grpc.adminserver;// Generated by 
the protocol buffer compiler.  DO NOT EDIT!
+// source: event_mesh_admin_service.proto
+
+public interface PayloadOrBuilder extends
+    // @@protoc_insertion_point(interface_extends:Payload)
+    com.google.protobuf.MessageOrBuilder {
+
+  /**
+   * <code>.Metadata metadata = 2;</code>
+   * @return Whether the metadata field is set.
+   */
+  boolean hasMetadata();
+  /**
+   * <code>.Metadata metadata = 2;</code>
+   * @return The metadata.
+   */
+  Metadata getMetadata();
+  /**
+   * <code>.Metadata metadata = 2;</code>
+   */
+  MetadataOrBuilder getMetadataOrBuilder();
+
+  /**
+   * <code>.google.protobuf.Any body = 3;</code>
+   * @return Whether the body field is set.
+   */
+  boolean hasBody();
+  /**
+   * <code>.google.protobuf.Any body = 3;</code>
+   * @return The body.
+   */
+  com.google.protobuf.Any getBody();
+  /**
+   * <code>.google.protobuf.Any body = 3;</code>
+   */
+  com.google.protobuf.AnyOrBuilder getBodyOrBuilder();
+}
diff --git a/eventmesh-registry/.gitignore b/eventmesh-registry/.gitignore
new file mode 100644
index 000000000..b63da4551
--- /dev/null
+++ b/eventmesh-registry/.gitignore
@@ -0,0 +1,42 @@
+.gradle
+build/
+!gradle/wrapper/gradle-wrapper.jar
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+out/
+!**/src/main/**/out/
+!**/src/test/**/out/
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+bin/
+!**/src/main/**/bin/
+!**/src/test/**/bin/
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store
\ No newline at end of file
diff --git a/eventmesh-registry/build.gradle b/eventmesh-registry/build.gradle
new file mode 100644
index 000000000..e69de29bb
diff --git a/eventmesh-registry/eventmesh-registry-api/build.gradle 
b/eventmesh-registry/eventmesh-registry-api/build.gradle
new file mode 100644
index 000000000..eb19172c5
--- /dev/null
+++ b/eventmesh-registry/eventmesh-registry-api/build.gradle
@@ -0,0 +1,8 @@
+dependencies {
+    implementation project(":eventmesh-spi")
+    implementation project(":eventmesh-common")
+    implementation "com.alibaba.nacos:nacos-client"
+
+    compileOnly 'org.projectlombok:lombok'
+    annotationProcessor 'org.projectlombok:lombok'
+}
\ No newline at end of file
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/AbstractRegistryListener.java
 
b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/AbstractRegistryListener.java
similarity index 87%
rename from 
eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/AbstractRegistryListener.java
rename to 
eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/AbstractRegistryListener.java
index cdcc16979..f5e36677c 100644
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/AbstractRegistryListener.java
+++ 
b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/AbstractRegistryListener.java
@@ -1,4 +1,4 @@
-package com.apache.eventmesh.admin.server.registry;
+package org.apache.eventmesh.registry;
 
 public abstract class AbstractRegistryListener<T> implements RegistryListener {
     protected abstract boolean checkType(Object data);
diff --git 
a/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/QueryInstances.java
 
b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/QueryInstances.java
new file mode 100644
index 000000000..5f0de5ebf
--- /dev/null
+++ 
b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/QueryInstances.java
@@ -0,0 +1,13 @@
+package org.apache.eventmesh.registry;
+
+import lombok.Data;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Data
+public class QueryInstances {
+    private String serviceName;
+    private boolean health;
+    private Map<String,String> extFields = new HashMap<>();
+}
diff --git 
a/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegisterServerInfo.java
 
b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegisterServerInfo.java
new file mode 100644
index 000000000..a46b846df
--- /dev/null
+++ 
b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegisterServerInfo.java
@@ -0,0 +1,41 @@
+package org.apache.eventmesh.registry;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@ToString
+public class RegisterServerInfo {
+    // different implementations will have different formats
+    @Getter
+    @Setter
+    private String serviceName;
+
+    @Getter
+    @Setter
+    private String address;
+
+    @Getter
+    @Setter
+    private boolean health;
+    @Getter
+    private Map<String,String> metadata = new HashMap<>();
+    @Getter
+    private Map<String, Object> extFields = new HashMap<>();
+
+    public void setMetadata(Map<String, String> metadata) {
+        if (metadata == null) {
+            this.metadata.clear();
+            return;
+        }
+
+        this.metadata = metadata;
+    }
+
+    public void addMetadata(String key, String value) {
+        this.metadata.put(key, value);
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/Registry.java
 
b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/Registry.java
similarity index 73%
rename from 
eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/Registry.java
rename to 
eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/Registry.java
index 771b45f2e..5a48f8c94 100644
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/Registry.java
+++ 
b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/Registry.java
@@ -1,10 +1,11 @@
-package com.apache.eventmesh.admin.server.registry;
+package org.apache.eventmesh.registry;
 
-import com.apache.eventmesh.admin.server.AdminException;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.eventmesh.registry.exception.RegistryException;
 import org.apache.eventmesh.spi.EventMeshExtensionFactory;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -34,7 +35,7 @@ public class Registry implements RegistryService {
     }
 
     @Override
-    public void init() throws AdminException {
+    public void init() throws RegistryException {
         if (initFlag.compareAndSet(false, true)) {
             return;
         }
@@ -42,7 +43,7 @@ public class Registry implements RegistryService {
     }
 
     @Override
-    public void shutdown() throws AdminException {
+    public void shutdown() throws RegistryException {
         if (shutdownFlag.compareAndSet(false, true)) {
             this.registryService.shutdown();
         }
@@ -59,12 +60,17 @@ public class Registry implements RegistryService {
     }
 
     @Override
-    public boolean register(EventMeshAdminServerRegisterInfo 
eventMeshRegisterInfo) throws AdminException {
-        return this.registryService.register(eventMeshRegisterInfo);
+    public List<RegisterServerInfo> selectInstances(QueryInstances serverInfo) 
{
+        return this.registryService.selectInstances(serverInfo);
     }
 
     @Override
-    public boolean unRegister(EventMeshAdminServerRegisterInfo 
eventMeshUnRegisterInfo) throws AdminException {
-        return this.registryService.unRegister(eventMeshUnRegisterInfo);
+    public boolean register(RegisterServerInfo registerInfo) throws 
RegistryException {
+        return this.registryService.register(registerInfo);
+    }
+
+    @Override
+    public boolean unRegister(RegisterServerInfo unRegisterInfo) throws 
RegistryException {
+        return this.registryService.unRegister(unRegisterInfo);
     }
 }
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryListener.java
 
b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegistryListener.java
similarity index 57%
rename from 
eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryListener.java
rename to 
eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegistryListener.java
index 2d339497f..4f53e4b76 100644
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryListener.java
+++ 
b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegistryListener.java
@@ -1,4 +1,4 @@
-package com.apache.eventmesh.admin.server.registry;
+package org.apache.eventmesh.registry;
 
 public interface RegistryListener {
     void onChange(Object data);
diff --git 
a/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegistryService.java
 
b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegistryService.java
new file mode 100644
index 000000000..f549e136d
--- /dev/null
+++ 
b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/RegistryService.java
@@ -0,0 +1,26 @@
+package org.apache.eventmesh.registry;
+
+
+import org.apache.eventmesh.registry.exception.RegistryException;
+import org.apache.eventmesh.spi.EventMeshExtensionType;
+import org.apache.eventmesh.spi.EventMeshSPI;
+
+import java.util.List;
+
+@EventMeshSPI(eventMeshExtensionType = EventMeshExtensionType.REGISTRY)
+public interface RegistryService {
+    String ConfigurationKey = "registry";
+    void init() throws RegistryException;
+
+    void shutdown() throws RegistryException;
+
+    void subscribe(RegistryListener registryListener, String serviceName);
+
+    void unsubscribe(RegistryListener registryListener, String serviceName);
+
+    List<RegisterServerInfo> selectInstances(QueryInstances serverInfo);
+
+    boolean register(RegisterServerInfo registerInfo) throws RegistryException;
+
+    boolean unRegister(RegisterServerInfo registerInfo) throws 
RegistryException;
+}
diff --git 
a/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/exception/RegistryException.java
 
b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/exception/RegistryException.java
new file mode 100644
index 000000000..d3ef24066
--- /dev/null
+++ 
b/eventmesh-registry/eventmesh-registry-api/src/main/java/org/apache/eventmesh/registry/exception/RegistryException.java
@@ -0,0 +1,11 @@
+package org.apache.eventmesh.registry.exception;
+
+public class RegistryException extends RuntimeException {
+    public RegistryException(String message) {
+        super(message);
+    }
+
+    public RegistryException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/eventmesh-registry/eventmesh-registry-nacos/build.gradle 
b/eventmesh-registry/eventmesh-registry-nacos/build.gradle
new file mode 100644
index 000000000..967032f33
--- /dev/null
+++ b/eventmesh-registry/eventmesh-registry-nacos/build.gradle
@@ -0,0 +1,8 @@
+dependencies {
+    implementation "com.alibaba.nacos:nacos-client"
+    implementation project(":eventmesh-registry:eventmesh-registry-api")
+    implementation project(":eventmesh-common")
+
+    compileOnly 'org.projectlombok:lombok'
+    annotationProcessor 'org.projectlombok:lombok'
+}
\ No newline at end of file
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosDiscoveryService.java
 
b/eventmesh-registry/eventmesh-registry-nacos/src/main/java/org/apache/eventmesh/registry/nacos/NacosDiscoveryService.java
similarity index 63%
rename from 
eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosDiscoveryService.java
rename to 
eventmesh-registry/eventmesh-registry-nacos/src/main/java/org/apache/eventmesh/registry/nacos/NacosDiscoveryService.java
index cd4fb1103..dbb9a140c 100644
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosDiscoveryService.java
+++ 
b/eventmesh-registry/eventmesh-registry-nacos/src/main/java/org/apache/eventmesh/registry/nacos/NacosDiscoveryService.java
@@ -1,23 +1,33 @@
-package com.apache.eventmesh.admin.server.registry;
+package org.apache.eventmesh.registry.nacos;
 
 import com.alibaba.nacos.api.NacosFactory;
 import com.alibaba.nacos.api.PropertyKeyConst;
 import com.alibaba.nacos.api.exception.NacosException;
 import com.alibaba.nacos.api.naming.NamingService;
-import com.alibaba.nacos.api.naming.listener.Event;
 import com.alibaba.nacos.api.naming.listener.EventListener;
 import com.alibaba.nacos.api.naming.pojo.Instance;
 import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
+import com.alibaba.nacos.api.naming.utils.NamingUtils;
 import com.alibaba.nacos.client.naming.utils.UtilAndComs;
-import com.apache.eventmesh.admin.server.AdminException;
-import com.apache.eventmesh.admin.server.AdminServer;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.eventmesh.common.config.CommonConfiguration;
 import org.apache.eventmesh.common.config.ConfigService;
 import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
+import org.apache.eventmesh.registry.QueryInstances;
+import org.apache.eventmesh.registry.RegisterServerInfo;
+import org.apache.eventmesh.registry.RegistryListener;
+import org.apache.eventmesh.registry.RegistryService;
+import org.apache.eventmesh.registry.exception.RegistryException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -27,7 +37,7 @@ import java.util.stream.Collectors;
 public class NacosDiscoveryService implements RegistryService {
     private final AtomicBoolean initFlag = new AtomicBoolean(false);
 
-    private EventMeshAdminServerConfiguration adminConf;
+    private CommonConfiguration configuration;
 
     private NacosRegistryConfiguration nacosConf;
 
@@ -39,19 +49,17 @@ public class NacosDiscoveryService implements 
RegistryService {
     private static final String GROUP_NAME = "admin";
 
     @Override
-    public void init() throws AdminException {
+    public void init() throws RegistryException {
         if (!initFlag.compareAndSet(false, true)) {
             return;
         }
-        CommonConfiguration configuration = 
ConfigurationContextUtil.get(AdminServer.ConfigurationKey);
-        if (!(configuration instanceof EventMeshAdminServerConfiguration)) {
-            throw new AdminException("registry config instance is null or not 
match type");
+        configuration = 
ConfigurationContextUtil.get(RegistryService.ConfigurationKey);
+        if (configuration == null ) {
+            throw new RegistryException("registry config instance is null");
         }
-
-        adminConf = (EventMeshAdminServerConfiguration)configuration;
-        NacosRegistryConfiguration nacosConf = 
ConfigService.getInstance().buildConfigInstance(NacosRegistryConfiguration.class);
-        if (nacosConf != null) {
-            this.nacosConf = nacosConf;
+        nacosConf = 
ConfigService.getInstance().buildConfigInstance(NacosRegistryConfiguration.class);
+        if (nacosConf == null) {
+            log.info("nacos registry configuration is null");
         }
         Properties properties = buildProperties();
         // registry
@@ -59,15 +67,15 @@ public class NacosDiscoveryService implements 
RegistryService {
             this.namingService = NacosFactory.createNamingService(properties);
         } catch (NacosException e) {
             log.error("[NacosRegistryService][start] error", e);
-            throw new AdminException(e.getMessage());
+            throw new RegistryException(e.getMessage());
         }
     }
 
     private Properties buildProperties() {
         Properties properties = new Properties();
-        properties.setProperty(PropertyKeyConst.SERVER_ADDR, 
adminConf.getRegistryCenterAddr());
-        properties.setProperty(PropertyKeyConst.USERNAME, 
adminConf.getEventMeshRegistryPluginUsername());
-        properties.setProperty(PropertyKeyConst.PASSWORD, 
adminConf.getEventMeshRegistryPluginPassword());
+        properties.setProperty(PropertyKeyConst.SERVER_ADDR, 
configuration.getRegistryAddr());
+        properties.setProperty(PropertyKeyConst.USERNAME, 
configuration.getEventMeshRegistryPluginUsername());
+        properties.setProperty(PropertyKeyConst.PASSWORD, 
configuration.getEventMeshRegistryPluginPassword());
         if (nacosConf == null) {
             return properties;
         }
@@ -99,7 +107,7 @@ public class NacosDiscoveryService implements 
RegistryService {
     }
 
     @Override
-    public void shutdown() throws AdminException {
+    public void shutdown() throws RegistryException {
         if (this.namingService != null) {
             try {
                 namingService.shutDown();
@@ -161,36 +169,69 @@ public class NacosDiscoveryService implements 
RegistryService {
     }
 
     @Override
-    public boolean register(EventMeshAdminServerRegisterInfo 
eventMeshRegisterInfo) throws AdminException {
+    public List<RegisterServerInfo> selectInstances(QueryInstances 
queryInstances) {
+        ArrayList<RegisterServerInfo> list = new ArrayList<>();
+        try {
+            ServiceInfo serviceInfo = 
ServiceInfo.fromKey(queryInstances.getServiceName());
+            ArrayList<String> clusters = new ArrayList<>();
+            if (StringUtils.isNotBlank(serviceInfo.getClusters())) {
+                
clusters.addAll(Arrays.asList(serviceInfo.getClusters().split(",")));
+            }
+            List<Instance> instances = 
namingService.selectInstances(serviceInfo.getName(), 
serviceInfo.getGroupName(), clusters, queryInstances.isHealth());
+            if (instances != null) {
+                instances.forEach(x -> {
+                    RegisterServerInfo instanceInfo = new RegisterServerInfo();
+                    instanceInfo.setMetadata(x.getMetadata());
+                    instanceInfo.setHealth(x.isHealthy());
+                    instanceInfo.setAddress(x.getIp() + ":" + x.getPort());
+                    
instanceInfo.setServiceName(ServiceInfo.getKey(NamingUtils.getGroupedName(x.getServiceName(),
 serviceInfo.getGroupName()), x.getClusterName()));
+                    list.add(instanceInfo);
+                });
+            }
+            return list;
+        } catch (Exception e) {
+            log.error("select instance by query {} from nacos fail", 
queryInstances, e);
+            return list;
+        }
+    }
+
+    @Override
+    public boolean register(RegisterServerInfo eventMeshRegisterInfo) throws 
RegistryException {
         try {
             String[] ipPort = eventMeshRegisterInfo.getAddress().split(":");
             if (ipPort.length < 2) {
                 return false;
             }
+            ServiceInfo serviceInfo = 
ServiceInfo.fromKey(eventMeshRegisterInfo.getServiceName());
             Instance instance = new Instance();
-            
instance.setClusterName(eventMeshRegisterInfo.getEventMeshClusterName());
+            instance.setClusterName(serviceInfo.getClusters());
             instance.setEnabled(true);
             instance.setEphemeral(true);
-            instance.setHealthy(true);
+            instance.setHealthy(eventMeshRegisterInfo.isHealth());
             instance.setWeight(1.0);
             instance.setIp(ipPort[0]);
             instance.setPort(Integer.parseInt(ipPort[1]));
             instance.setMetadata(eventMeshRegisterInfo.getMetadata());
-            
namingService.registerInstance(eventMeshRegisterInfo.getEventMeshName(), 
GROUP_NAME, instance);
+            namingService.registerInstance(serviceInfo.getName(), 
serviceInfo.getGroupName(), instance);
             return true;
         } catch (Exception e) {
-            log.error("register instance service {} group {} cluster {} fail", 
eventMeshRegisterInfo.getEventMeshName(), GROUP_NAME, 
eventMeshRegisterInfo.getEventMeshClusterName(), e);
+            log.error("register instance service {} fail", 
eventMeshRegisterInfo, e);
             return false;
         }
     }
 
     @Override
-    public boolean unRegister(EventMeshAdminServerRegisterInfo 
eventMeshRegisterInfo) throws AdminException {
+    public boolean unRegister(RegisterServerInfo eventMeshRegisterInfo) throws 
RegistryException {
         try {
-            
namingService.registerInstance(eventMeshRegisterInfo.getEventMeshName(), 
GROUP_NAME, new Instance());
+            String[] ipPort = eventMeshRegisterInfo.getAddress().split(":");
+            if (ipPort.length < 2) {
+                return false;
+            }
+            ServiceInfo serviceInfo = 
ServiceInfo.fromKey(eventMeshRegisterInfo.getServiceName());
+            namingService.deregisterInstance(serviceInfo.getName(), 
serviceInfo.getGroupName(), ipPort[0], Integer.parseInt(ipPort[1]), 
serviceInfo.getClusters());
             return true;
         } catch (Exception e) {
-            log.error("register instance service {} group {} cluster {} fail", 
eventMeshRegisterInfo.getEventMeshName(), GROUP_NAME, 
eventMeshRegisterInfo.getEventMeshClusterName(), e);
+            log.error("unregister instance service {} fail", 
eventMeshRegisterInfo, e);
             return false;
         }
     }
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosRegistryConfiguration.java
 
b/eventmesh-registry/eventmesh-registry-nacos/src/main/java/org/apache/eventmesh/registry/nacos/NacosRegistryConfiguration.java
similarity index 97%
rename from 
eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosRegistryConfiguration.java
rename to 
eventmesh-registry/eventmesh-registry-nacos/src/main/java/org/apache/eventmesh/registry/nacos/NacosRegistryConfiguration.java
index 45932e9fd..a8c473d27 100644
--- 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosRegistryConfiguration.java
+++ 
b/eventmesh-registry/eventmesh-registry-nacos/src/main/java/org/apache/eventmesh/registry/nacos/NacosRegistryConfiguration.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package com.apache.eventmesh.admin.server.registry;
+package org.apache.eventmesh.registry.nacos;
 
 import com.alibaba.nacos.api.PropertyKeyConst;
 import com.alibaba.nacos.client.naming.utils.UtilAndComs;
diff --git a/settings.gradle b/settings.gradle
index 6162f91f7..ec1eb4c5e 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -127,3 +127,9 @@ include 'eventmesh-retry'
 include 'eventmesh-retry:eventmesh-retry-api'
 include 'eventmesh-retry:eventmesh-retry-rocketmq'
 include 'eventmesh-admin-server'
+include 'eventmesh-registry'
+include 'eventmesh-registry:eventmesh-registry-api'
+findProject(':eventmesh-registry:eventmesh-registry-api')?.name = 
'eventmesh-registry-api'
+include 'eventmesh-registry:eventmesh-registry-nacos'
+findProject(':eventmesh-registry:eventmesh-registry-nacos')?.name = 
'eventmesh-registry-nacos'
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to