This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch eventmesh-function
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/eventmesh-function by this 
push:
     new 848118735 EventMesh function admin (#4851)
848118735 is described below

commit 8481187356f2e4cf77b96d6f86c2cf458d34ec41
Author: sodaRyCN <[email protected]>
AuthorDate: Thu Apr 18 16:53:19 2024 +0800

    EventMesh function admin (#4851)
    
    * own
    
    * dependency
    
    * finish registry
---
 build.gradle                                       |   3 +
 eventmesh-admin-server/.gitignore                  |  42 +++++
 eventmesh-admin-server/build.gradle                |  16 ++
 eventmesh-admin-server/gradle.properties           |  16 ++
 .../com/apache/eventmesh/admin/server/Admin.java   |  24 +++
 .../eventmesh/admin/server/AdminException.java     |  11 ++
 .../apache/eventmesh/admin/server/AdminServer.java |  57 ++++++
 .../eventmesh/admin/server/ComponentLifeCycle.java |   6 +
 .../apache/eventmesh/admin/server/HeartBeat.java   |  12 ++
 .../server/registry/AbstractRegistryListener.java  |  14 ++
 .../EventMeshAdminServerConfiguration.java         |  32 ++++
 .../registry/EventMeshAdminServerRegisterInfo.java |  14 ++
 .../server/registry/NacosDiscoveryService.java     | 197 +++++++++++++++++++++
 .../registry/NacosRegistryConfiguration.java       |  59 ++++++
 .../eventmesh/admin/server/registry/Registry.java  |  70 ++++++++
 .../admin/server/registry/RegistryListener.java    |   5 +
 .../admin/server/registry/RegistryService.java     |  20 +++
 .../apache/eventmesh/admin/server/task/Job.java    |   8 +
 .../eventmesh/admin/server/task/JobState.java      |  10 ++
 .../eventmesh/admin/server/task/JobType.java       |   7 +
 .../eventmesh/admin/server/task/Position.java      |   5 +
 .../apache/eventmesh/admin/server/task/Task.java   |  17 ++
 .../apache/eventmesh/admin/server/web/Request.java |  22 +++
 .../eventmesh/admin/server/web/Response.java       |  33 ++++
 .../admin/server/web/ServerController.java         |   9 +
 ...eventmesh.admin.server.registry.RegistryService |  16 ++
 .../src/main/resources/application.yaml            |   8 +
 .../apache/eventmesh/common/utils/PagedList.java   |  52 ++++++
 .../eventmesh/spi/EventMeshExtensionType.java      |   1 +
 settings.gradle                                    |   1 +
 30 files changed, 787 insertions(+)

diff --git a/build.gradle b/build.gradle
index ec8bc4a45..98619ed84 100644
--- a/build.gradle
+++ b/build.gradle
@@ -574,6 +574,9 @@ subprojects {
             dependency "software.amazon.awssdk:s3:2.20.29"
             dependency "com.github.rholder:guava-retrying:2.0.0"
 
+            dependency 
"org.mybatis.spring.boot:mybatis-spring-boot-starter:2.3.2"
+            dependency "com.alibaba:druid-spring-boot-starter:1.2.22"
+            dependency 
"org.springframework.boot:spring-boot-starter-jetty:2.7.10"
         }
     }
 }
diff --git a/eventmesh-admin-server/.gitignore 
b/eventmesh-admin-server/.gitignore
new file mode 100644
index 000000000..b63da4551
--- /dev/null
+++ b/eventmesh-admin-server/.gitignore
@@ -0,0 +1,42 @@
+.gradle
+build/
+!gradle/wrapper/gradle-wrapper.jar
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+out/
+!**/src/main/**/out/
+!**/src/test/**/out/
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+bin/
+!**/src/main/**/bin/
+!**/src/test/**/bin/
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store
\ No newline at end of file
diff --git a/eventmesh-admin-server/build.gradle 
b/eventmesh-admin-server/build.gradle
new file mode 100644
index 000000000..63ed5a456
--- /dev/null
+++ b/eventmesh-admin-server/build.gradle
@@ -0,0 +1,16 @@
+dependencies {
+       implementation project(":eventmesh-spi")
+       implementation project(":eventmesh-common")
+       implementation "com.alibaba.nacos:nacos-client"
+       implementation ("org.springframework.boot:spring-boot-starter-web") {
+               exclude group: "org.springframework.boot" ,module: 
"spring-boot-starter-tomcat"
+       }
+       implementation 'org.springframework.boot:spring-boot-starter-jetty'
+
+       implementation "org.mybatis.spring.boot:mybatis-spring-boot-starter"
+       // 
https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter
+       implementation "com.alibaba:druid-spring-boot-starter"
+       compileOnly 'org.projectlombok:lombok'
+       annotationProcessor 'org.projectlombok:lombok'
+}
+
diff --git a/eventmesh-admin-server/gradle.properties 
b/eventmesh-admin-server/gradle.properties
new file mode 100644
index 000000000..a9fd83fea
--- /dev/null
+++ b/eventmesh-admin-server/gradle.properties
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
\ No newline at end of file
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java
new file mode 100644
index 000000000..1090f7b59
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/Admin.java
@@ -0,0 +1,24 @@
+package com.apache.eventmesh.admin.server;
+
+import org.apache.eventmesh.common.utils.PagedList;
+
+import com.apache.eventmesh.admin.server.task.Task;
+
+public interface Admin extends ComponentLifeCycle{
+    /**
+     * support for web or ops
+     **/
+    boolean createOrUpdateTask(Task task);
+    boolean deleteTask(Long id);
+    Task getTask(Long id);
+    // paged list
+    PagedList<Task> getTaskPaged(Task task);
+
+    /**
+     * support for task
+     */
+    void reportHeartbeat(HeartBeat heartBeat);
+
+
+
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminException.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminException.java
new file mode 100644
index 000000000..eca5eeb0d
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminException.java
@@ -0,0 +1,11 @@
+package com.apache.eventmesh.admin.server;
+
+public class AdminException extends RuntimeException {
+    public AdminException(String message) {
+        super(message);
+    }
+
+    public AdminException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java
new file mode 100644
index 000000000..a00182361
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/AdminServer.java
@@ -0,0 +1,57 @@
+package com.apache.eventmesh.admin.server;
+
+import 
com.apache.eventmesh.admin.server.registry.EventMeshAdminServerRegisterInfo;
+import com.apache.eventmesh.admin.server.registry.RegistryService;
+import org.apache.eventmesh.common.utils.PagedList;
+
+import com.apache.eventmesh.admin.server.task.Task;
+
+public class AdminServer implements Admin {
+
+    private RegistryService registryService;
+
+    private EventMeshAdminServerRegisterInfo registerInfo;
+
+    public AdminServer(RegistryService registryService, 
EventMeshAdminServerRegisterInfo registerInfo) {
+        this.registryService = registryService;
+        this.registerInfo = registerInfo;
+    }
+
+    public static final String ConfigurationKey = "admin-server";
+    @Override
+    public boolean createOrUpdateTask(Task task) {
+        return false;
+    }
+
+    @Override
+    public boolean deleteTask(Long id) {
+        return false;
+    }
+
+    @Override
+    public Task getTask(Long id) {
+        return null;
+    }
+
+    @Override
+    public PagedList<Task> getTaskPaged(Task task) {
+        return null;
+    }
+
+    @Override
+    public void reportHeartbeat(HeartBeat heartBeat) {
+
+    }
+
+    @Override
+    public void start() {
+
+        registryService.register(registerInfo);
+    }
+
+    @Override
+    public void destroy() {
+        registryService.unRegister(registerInfo);
+        registryService.shutdown();
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ComponentLifeCycle.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ComponentLifeCycle.java
new file mode 100644
index 000000000..76abd005b
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/ComponentLifeCycle.java
@@ -0,0 +1,6 @@
+package com.apache.eventmesh.admin.server;
+
+public interface ComponentLifeCycle {
+    void start();
+    void destroy();
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/HeartBeat.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/HeartBeat.java
new file mode 100644
index 000000000..b8a28c4bd
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/HeartBeat.java
@@ -0,0 +1,12 @@
+package com.apache.eventmesh.admin.server;
+
+import com.apache.eventmesh.admin.server.task.JobState;
+import com.apache.eventmesh.admin.server.task.Position;
+
+public class HeartBeat {
+    private String address;
+    private String reportedTimeStamp;
+    private String jobID;
+    private Position position;
+    private JobState state;
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/AbstractRegistryListener.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/AbstractRegistryListener.java
new file mode 100644
index 000000000..cdcc16979
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/AbstractRegistryListener.java
@@ -0,0 +1,14 @@
+package com.apache.eventmesh.admin.server.registry;
+
+public abstract class AbstractRegistryListener<T> implements RegistryListener {
+    protected abstract boolean checkType(Object data);
+    @Override
+    @SuppressWarnings("unchecked")
+    public void onChange(Object data) {
+        if (!checkType(data)) {
+            return;
+        }
+        process((T)data);
+    }
+    protected abstract void process(T data);
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerConfiguration.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerConfiguration.java
new file mode 100644
index 000000000..dc436b28d
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerConfiguration.java
@@ -0,0 +1,32 @@
+package com.apache.eventmesh.admin.server.registry;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.eventmesh.common.config.CommonConfiguration;
+import org.apache.eventmesh.common.config.Config;
+import org.apache.eventmesh.common.config.ConfigFiled;
+
+@Data
+@NoArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+@Config(prefix = "eventMesh.admin")
+public class EventMeshAdminServerConfiguration extends CommonConfiguration {
+    @ConfigFiled(field = "server.http.port")
+    private int eventMeshHttpServerPort = 10000;
+
+    @ConfigFiled(field = "server.gRPC.port")
+    private int eventMeshGrpcServerPort = 10000;
+
+    @ConfigFiled(field = "registry.plugin.server-addr", notEmpty = true)
+    private String registryCenterAddr = "";
+
+    @ConfigFiled(field = "registry.plugin.type", notEmpty = true)
+    private String eventMeshRegistryPluginType = "nacos";
+
+    @ConfigFiled(field = "registry.plugin.username")
+    private String eventMeshRegistryPluginUsername = "";
+
+    @ConfigFiled(field = "registry.plugin.password")
+    private String eventMeshRegistryPluginPassword = "";
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerRegisterInfo.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerRegisterInfo.java
new file mode 100644
index 000000000..c51ae6417
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/EventMeshAdminServerRegisterInfo.java
@@ -0,0 +1,14 @@
+package com.apache.eventmesh.admin.server.registry;
+
+import lombok.Data;
+
+import java.util.Map;
+
+@Data
+public class EventMeshAdminServerRegisterInfo {
+    private String eventMeshClusterName;
+    private String eventMeshName;
+    private String address;
+
+    private Map<String, String> metadata;
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosDiscoveryService.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosDiscoveryService.java
new file mode 100644
index 000000000..cd4fb1103
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosDiscoveryService.java
@@ -0,0 +1,197 @@
+package com.apache.eventmesh.admin.server.registry;
+
+import com.alibaba.nacos.api.NacosFactory;
+import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.api.exception.NacosException;
+import com.alibaba.nacos.api.naming.NamingService;
+import com.alibaba.nacos.api.naming.listener.Event;
+import com.alibaba.nacos.api.naming.listener.EventListener;
+import com.alibaba.nacos.api.naming.pojo.Instance;
+import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
+import com.alibaba.nacos.client.naming.utils.UtilAndComs;
+import com.apache.eventmesh.admin.server.AdminException;
+import com.apache.eventmesh.admin.server.AdminServer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.eventmesh.common.config.CommonConfiguration;
+import org.apache.eventmesh.common.config.ConfigService;
+import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class NacosDiscoveryService implements RegistryService {
+    private final AtomicBoolean initFlag = new AtomicBoolean(false);
+
+    private EventMeshAdminServerConfiguration adminConf;
+
+    private NacosRegistryConfiguration nacosConf;
+
+    private NamingService namingService;
+
+    private final Map<String, Map<RegistryListener, EventListener>> listeners 
= new HashMap<>();
+
+    private final Lock lock = new ReentrantLock();
+    private static final String GROUP_NAME = "admin";
+
+    @Override
+    public void init() throws AdminException {
+        if (!initFlag.compareAndSet(false, true)) {
+            return;
+        }
+        CommonConfiguration configuration = 
ConfigurationContextUtil.get(AdminServer.ConfigurationKey);
+        if (!(configuration instanceof EventMeshAdminServerConfiguration)) {
+            throw new AdminException("registry config instance is null or not 
match type");
+        }
+
+        adminConf = (EventMeshAdminServerConfiguration)configuration;
+        NacosRegistryConfiguration nacosConf = 
ConfigService.getInstance().buildConfigInstance(NacosRegistryConfiguration.class);
+        if (nacosConf != null) {
+            this.nacosConf = nacosConf;
+        }
+        Properties properties = buildProperties();
+        // registry
+        try {
+            this.namingService = NacosFactory.createNamingService(properties);
+        } catch (NacosException e) {
+            log.error("[NacosRegistryService][start] error", e);
+            throw new AdminException(e.getMessage());
+        }
+    }
+
+    private Properties buildProperties() {
+        Properties properties = new Properties();
+        properties.setProperty(PropertyKeyConst.SERVER_ADDR, 
adminConf.getRegistryCenterAddr());
+        properties.setProperty(PropertyKeyConst.USERNAME, 
adminConf.getEventMeshRegistryPluginUsername());
+        properties.setProperty(PropertyKeyConst.PASSWORD, 
adminConf.getEventMeshRegistryPluginPassword());
+        if (nacosConf == null) {
+            return properties;
+        }
+        String endpoint = nacosConf.getEndpoint();
+        if (Objects.nonNull(endpoint) && endpoint.contains(":")) {
+            int index = endpoint.indexOf(":");
+            properties.put(PropertyKeyConst.ENDPOINT, endpoint.substring(0, 
index));
+            properties.put(PropertyKeyConst.ENDPOINT_PORT, 
endpoint.substring(index + 1));
+        } else {
+            Optional.ofNullable(endpoint).ifPresent(value -> 
properties.put(PropertyKeyConst.ENDPOINT, endpoint));
+            String endpointPort = nacosConf.getEndpointPort();
+            Optional.ofNullable(endpointPort).ifPresent(value -> 
properties.put(PropertyKeyConst.ENDPOINT_PORT, endpointPort));
+        }
+        String accessKey = nacosConf.getAccessKey();
+        Optional.ofNullable(accessKey).ifPresent(value -> 
properties.put(PropertyKeyConst.ACCESS_KEY, accessKey));
+        String secretKey = nacosConf.getSecretKey();
+        Optional.ofNullable(secretKey).ifPresent(value -> 
properties.put(PropertyKeyConst.SECRET_KEY, secretKey));
+        String clusterName = nacosConf.getClusterName();
+        Optional.ofNullable(clusterName).ifPresent(value -> 
properties.put(PropertyKeyConst.CLUSTER_NAME, clusterName));
+        String logFileName = nacosConf.getLogFileName();
+        Optional.ofNullable(logFileName).ifPresent(value -> 
properties.put(UtilAndComs.NACOS_NAMING_LOG_NAME, logFileName));
+        String logLevel = nacosConf.getLogLevel();
+        Optional.ofNullable(logLevel).ifPresent(value -> 
properties.put(UtilAndComs.NACOS_NAMING_LOG_LEVEL, logLevel));
+        Integer pollingThreadCount = nacosConf.getPollingThreadCount();
+        Optional.ofNullable(pollingThreadCount).ifPresent(value -> 
properties.put(PropertyKeyConst.NAMING_POLLING_THREAD_COUNT, 
pollingThreadCount));
+        String namespace = nacosConf.getNamespace();
+        Optional.ofNullable(namespace).ifPresent(value -> 
properties.put(PropertyKeyConst.NAMESPACE, namespace));
+        return properties;
+    }
+
+    @Override
+    public void shutdown() throws AdminException {
+        if (this.namingService != null) {
+            try {
+                namingService.shutDown();
+            } catch (NacosException e) {
+                log.warn("shutdown nacos naming service fail", e);
+            }
+        }
+    }
+
+    @Override
+    public void subscribe(RegistryListener listener, String serviceName) {
+        lock.lock();
+        try {
+            ServiceInfo serviceInfo = ServiceInfo.fromKey(serviceName);
+            Map<RegistryListener, EventListener> eventListenerMap = 
listeners.computeIfAbsent(serviceName, k -> new HashMap<>());
+            if (eventListenerMap.containsKey(listener)) {
+                log.warn("already use same listener subscribe service name {}" 
,serviceName);
+                return;
+            }
+            EventListener eventListener = listener::onChange;
+            List<String> clusters ;
+            if (serviceInfo.getClusters() == null || 
serviceInfo.getClusters().isEmpty()) {
+                clusters = new ArrayList<>();
+            } else {
+                clusters = 
Arrays.stream(serviceInfo.getClusters().split(",")).collect(Collectors.toList());
+            }
+            
namingService.subscribe(serviceInfo.getName(),serviceInfo.getGroupName(), 
clusters, eventListener);
+            eventListenerMap.put(listener, eventListener);
+        } catch (Exception e) {
+            log.error("subscribe service name {} fail", serviceName, e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public void unsubscribe(RegistryListener registryListener, String 
serviceName) {
+        lock.lock();
+        try {
+            ServiceInfo serviceInfo = ServiceInfo.fromKey(serviceName);
+            Map<RegistryListener, EventListener> map = 
listeners.get(serviceName);
+            if (map == null) {
+                return;
+            }
+            List<String> clusters ;
+            if (serviceInfo.getClusters() == null || 
serviceInfo.getClusters().isEmpty()) {
+                clusters = new ArrayList<>();
+            } else {
+                clusters = 
Arrays.stream(serviceInfo.getClusters().split(",")).collect(Collectors.toList());
+            }
+            EventListener eventListener = map.get(registryListener);
+            namingService.unsubscribe(serviceInfo.getName(), 
serviceInfo.getGroupName(), clusters, eventListener);
+            map.remove(registryListener);
+        } catch (Exception e) {
+            log.error("unsubscribe service name {} fail", serviceName, e);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public boolean register(EventMeshAdminServerRegisterInfo 
eventMeshRegisterInfo) throws AdminException {
+        try {
+            String[] ipPort = eventMeshRegisterInfo.getAddress().split(":");
+            if (ipPort.length < 2) {
+                return false;
+            }
+            Instance instance = new Instance();
+            
instance.setClusterName(eventMeshRegisterInfo.getEventMeshClusterName());
+            instance.setEnabled(true);
+            instance.setEphemeral(true);
+            instance.setHealthy(true);
+            instance.setWeight(1.0);
+            instance.setIp(ipPort[0]);
+            instance.setPort(Integer.parseInt(ipPort[1]));
+            instance.setMetadata(eventMeshRegisterInfo.getMetadata());
+            
namingService.registerInstance(eventMeshRegisterInfo.getEventMeshName(), 
GROUP_NAME, instance);
+            return true;
+        } catch (Exception e) {
+            log.error("register instance service {} group {} cluster {} fail", 
eventMeshRegisterInfo.getEventMeshName(), GROUP_NAME, 
eventMeshRegisterInfo.getEventMeshClusterName(), e);
+            return false;
+        }
+    }
+
+    @Override
+    public boolean unRegister(EventMeshAdminServerRegisterInfo 
eventMeshRegisterInfo) throws AdminException {
+        try {
+            
namingService.registerInstance(eventMeshRegisterInfo.getEventMeshName(), 
GROUP_NAME, new Instance());
+            return true;
+        } catch (Exception e) {
+            log.error("register instance service {} group {} cluster {} fail", 
eventMeshRegisterInfo.getEventMeshName(), GROUP_NAME, 
eventMeshRegisterInfo.getEventMeshClusterName(), e);
+            return false;
+        }
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosRegistryConfiguration.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosRegistryConfiguration.java
new file mode 100644
index 000000000..45932e9fd
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/NacosRegistryConfiguration.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.eventmesh.admin.server.registry;
+
+import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.client.naming.utils.UtilAndComs;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.eventmesh.common.config.Config;
+import org.apache.eventmesh.common.config.ConfigFiled;
+
+@Data
+@NoArgsConstructor
+@Config(prefix = "eventMesh.registry.nacos")
+public class NacosRegistryConfiguration {
+
+    @ConfigFiled(field = PropertyKeyConst.ENDPOINT)
+    private String endpoint;
+
+    @ConfigFiled(field = PropertyKeyConst.ENDPOINT_PORT)
+    private String endpointPort;
+
+    @ConfigFiled(field = PropertyKeyConst.ACCESS_KEY)
+    private String accessKey;
+
+    @ConfigFiled(field = PropertyKeyConst.SECRET_KEY)
+    private String secretKey;
+
+    @ConfigFiled(field = PropertyKeyConst.CLUSTER_NAME)
+    private String clusterName;
+
+    @ConfigFiled(field = PropertyKeyConst.NAMESPACE)
+    private String namespace;
+
+    @ConfigFiled(field = PropertyKeyConst.NAMING_POLLING_THREAD_COUNT)
+    private Integer pollingThreadCount = 
Runtime.getRuntime().availableProcessors() / 2 + 1;
+
+    @ConfigFiled(field = UtilAndComs.NACOS_NAMING_LOG_NAME)
+    private String logFileName;
+
+    @ConfigFiled(field = UtilAndComs.NACOS_NAMING_LOG_LEVEL)
+    private String logLevel;
+
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/Registry.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/Registry.java
new file mode 100644
index 000000000..771b45f2e
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/Registry.java
@@ -0,0 +1,70 @@
+package com.apache.eventmesh.admin.server.registry;
+
+import com.apache.eventmesh.admin.server.AdminException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.eventmesh.spi.EventMeshExtensionFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Slf4j
+public class Registry implements RegistryService {
+    private static final Map<String, Registry> META_CACHE = new HashMap<>(16);
+    private RegistryService registryService;
+
+    private final AtomicBoolean initFlag = new AtomicBoolean(false);
+    private final AtomicBoolean shutdownFlag = new AtomicBoolean(false);
+
+    public static Registry getInstance(String registryPluginType) {
+        return META_CACHE.computeIfAbsent(registryPluginType, 
Registry::registryBuilder);
+    }
+
+    private static Registry registryBuilder(String registryPluginType) {
+        RegistryService registryServiceExt = 
EventMeshExtensionFactory.getExtension(RegistryService.class, 
registryPluginType);
+        if (registryServiceExt == null) {
+            String errorMsg = "can't load the metaService plugin, please 
check.";
+            log.error(errorMsg);
+            throw new RuntimeException(errorMsg);
+        }
+        Registry metaStorage = new Registry();
+        metaStorage.registryService = registryServiceExt;
+
+        return metaStorage;
+    }
+
+    @Override
+    public void init() throws AdminException {
+        if (initFlag.compareAndSet(false, true)) {
+            return;
+        }
+        this.registryService.init();
+    }
+
+    @Override
+    public void shutdown() throws AdminException {
+        if (shutdownFlag.compareAndSet(false, true)) {
+            this.registryService.shutdown();
+        }
+    }
+
+    @Override
+    public void subscribe(RegistryListener registryListener, String 
serviceName) {
+        this.registryService.subscribe(registryListener, serviceName);
+    }
+
+    @Override
+    public void unsubscribe(RegistryListener registryListener, String 
serviceName) {
+        this.registryService.unsubscribe(registryListener, serviceName);
+    }
+
+    @Override
+    public boolean register(EventMeshAdminServerRegisterInfo 
eventMeshRegisterInfo) throws AdminException {
+        return this.registryService.register(eventMeshRegisterInfo);
+    }
+
+    @Override
+    public boolean unRegister(EventMeshAdminServerRegisterInfo 
eventMeshUnRegisterInfo) throws AdminException {
+        return this.registryService.unRegister(eventMeshUnRegisterInfo);
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryListener.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryListener.java
new file mode 100644
index 000000000..2d339497f
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryListener.java
@@ -0,0 +1,5 @@
+package com.apache.eventmesh.admin.server.registry;
+
+public interface RegistryListener {
+    void onChange(Object data);
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryService.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryService.java
new file mode 100644
index 000000000..0cddd009a
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/registry/RegistryService.java
@@ -0,0 +1,20 @@
+package com.apache.eventmesh.admin.server.registry;
+
+import com.apache.eventmesh.admin.server.AdminException;
+import org.apache.eventmesh.spi.EventMeshExtensionType;
+import org.apache.eventmesh.spi.EventMeshSPI;
+
+@EventMeshSPI(eventMeshExtensionType = EventMeshExtensionType.REGISTRY)
+public interface RegistryService {
+    void init() throws AdminException;
+
+    void shutdown() throws AdminException;
+
+    void subscribe(RegistryListener registryListener, String serviceName);
+
+    void unsubscribe(RegistryListener registryListener, String serviceName);
+
+    boolean register(EventMeshAdminServerRegisterInfo eventMeshRegisterInfo) 
throws AdminException;
+
+    boolean unRegister(EventMeshAdminServerRegisterInfo 
eventMeshUnRegisterInfo) throws AdminException;
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Job.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Job.java
new file mode 100644
index 000000000..1fe5b0897
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Job.java
@@ -0,0 +1,8 @@
+package com.apache.eventmesh.admin.server.task;
+
+public class Job {
+    private long id;
+    private long taskID;
+    private JobType type;
+    private JobState state;
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobState.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobState.java
new file mode 100644
index 000000000..845d91c4a
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobState.java
@@ -0,0 +1,10 @@
+package com.apache.eventmesh.admin.server.task;
+
+public enum JobState {
+    INIT,
+    STARaTED,
+    PAUSE,
+    COMPLETE,
+    DELETE,
+    FAIL
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobType.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobType.java
new file mode 100644
index 000000000..b69480398
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/JobType.java
@@ -0,0 +1,7 @@
+package com.apache.eventmesh.admin.server.task;
+
+public enum JobType {
+    FULL,
+    INCREASE,
+    STRUCT_SYNC
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Position.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Position.java
new file mode 100644
index 000000000..491f796a9
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Position.java
@@ -0,0 +1,5 @@
+package com.apache.eventmesh.admin.server.task;
+
+public class Position {
+
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Task.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Task.java
new file mode 100644
index 000000000..4f6cb7cfe
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/task/Task.java
@@ -0,0 +1,17 @@
+package com.apache.eventmesh.admin.server.task;
+
+// task : job = 1 : m
+public class Task {
+    private long id;
+    private String name;
+    private String desc;
+    private String uid;
+    private String sourceUser;
+    private String sourcePasswd;
+    private String targetUser;
+    private String targetPasswd;
+    private int sourceType;
+    private int targetType;
+
+
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/Request.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/Request.java
new file mode 100644
index 000000000..d36c292d6
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/Request.java
@@ -0,0 +1,22 @@
+package com.apache.eventmesh.admin.server.web;
+
+public class Request<T> {
+    private String uid;
+    private T data;
+
+    public String getUid() {
+        return uid;
+    }
+
+    public void setUid(String uid) {
+        this.uid = uid;
+    }
+
+    public T getData() {
+        return data;
+    }
+
+    public void setData(T data) {
+        this.data = data;
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/Response.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/Response.java
new file mode 100644
index 000000000..4502ad792
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/Response.java
@@ -0,0 +1,33 @@
+package com.apache.eventmesh.admin.server.web;
+
+public class Response<T> {
+    private boolean success;
+
+    private String desc;
+
+    private T data;
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+    public String getDesc() {
+        return desc;
+    }
+
+    public void setDesc(String desc) {
+        this.desc = desc;
+    }
+
+    public T getData() {
+        return data;
+    }
+
+    public void setData(T data) {
+        this.data = data;
+    }
+}
diff --git 
a/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/ServerController.java
 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/ServerController.java
new file mode 100644
index 000000000..6f6e5fc7c
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/java/com/apache/eventmesh/admin/server/web/ServerController.java
@@ -0,0 +1,9 @@
+package com.apache.eventmesh.admin.server.web;
+
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/eventmesh/admin")
+public class ServerController {
+}
diff --git 
a/eventmesh-admin-server/src/main/resources/META-INF.eventmesh/com.apache.eventmesh.admin.server.registry.RegistryService
 
b/eventmesh-admin-server/src/main/resources/META-INF.eventmesh/com.apache.eventmesh.admin.server.registry.RegistryService
new file mode 100644
index 000000000..656fec8f3
--- /dev/null
+++ 
b/eventmesh-admin-server/src/main/resources/META-INF.eventmesh/com.apache.eventmesh.admin.server.registry.RegistryService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+nacos=com.apache.eventmesh.admin.server.registry.NacosDiscoveryService
\ No newline at end of file
diff --git a/eventmesh-admin-server/src/main/resources/application.yaml 
b/eventmesh-admin-server/src/main/resources/application.yaml
new file mode 100644
index 000000000..aa72432b6
--- /dev/null
+++ b/eventmesh-admin-server/src/main/resources/application.yaml
@@ -0,0 +1,8 @@
+spring:
+  datasource:
+    url: 
jdbc:mysql://localhost:3306/test?serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false
+    username: sodafang
+    password: asdfasdf
+    driver-class-name: com.mysql.cj.jdbc.Driver
+mybatis:
+  mapper-locations: classpath:mapper/*.xml
\ No newline at end of file
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/PagedList.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/PagedList.java
new file mode 100644
index 000000000..0046764ad
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/PagedList.java
@@ -0,0 +1,52 @@
+package org.apache.eventmesh.common.utils;
+
+import java.util.List;
+
+public class PagedList<T> {
+    private int totalSize;
+    private int totalPage;
+    private int size;
+    private int page;
+
+    private List<T> data;
+
+    public int getTotalSize() {
+        return totalSize;
+    }
+
+    public void setTotalSize(int totalSize) {
+        this.totalSize = totalSize;
+    }
+
+    public int getTotalPage() {
+        return totalPage;
+    }
+
+    public void setTotalPage(int totalPage) {
+        this.totalPage = totalPage;
+    }
+
+    public int getSize() {
+        return size;
+    }
+
+    public void setSize(int size) {
+        this.size = size;
+    }
+
+    public int getPage() {
+        return page;
+    }
+
+    public void setPage(int page) {
+        this.page = page;
+    }
+
+    public List<T> getData() {
+        return data;
+    }
+
+    public void setData(List<T> data) {
+        this.data = data;
+    }
+}
diff --git 
a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionType.java
 
b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionType.java
index f76379f9e..8de4e1ecf 100644
--- 
a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionType.java
+++ 
b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionType.java
@@ -26,6 +26,7 @@ public enum EventMeshExtensionType {
     CONNECTOR("connector"),
     STORAGE("storage"),
     META("metaStorage"),
+    REGISTRY("registryCenter"),
     SECURITY("security"),
     PROTOCOL("protocol"),
     METRICS("metrics"),
diff --git a/settings.gradle b/settings.gradle
index 645e6fb36..6162f91f7 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -126,3 +126,4 @@ include 'eventmesh-webhook:eventmesh-webhook-receive'
 include 'eventmesh-retry'
 include 'eventmesh-retry:eventmesh-retry-api'
 include 'eventmesh-retry:eventmesh-retry-rocketmq'
+include 'eventmesh-admin-server'


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to