This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fcd09118 [ISSUE #4577] Implement FilterEngine for EventMesh Filters 
(#4578)
2fcd09118 is described below

commit 2fcd091189b8ec97ff33e5c1fda0d785f9b5ad76
Author: mike_xwm <[email protected]>
AuthorDate: Tue Nov 28 11:31:35 2023 +0800

    [ISSUE #4577] Implement FilterEngine for EventMesh Filters (#4578)
    
    * update ci.yml
    
    * add filters
    
    * add filters
    
    * add filters
    
    * apply filters under http processor
    
    * update filterEngine
    
    * [ISSUE #4577] Implement FilterEngine for EventMesh Filters
    
    * refactor ci.yml
    
    * fix ci check error
    
    * fix ci check error
    
    * fix ci check error
    
    * fix ci check error
    
    * remove duplicate method in PatternBuilder
---
 .../protocol/http/common/EventMeshRetCode.java     |   2 +
 .../org/apache/eventmesh/filter/PatternEntry.java  |  13 +-
 .../filter/patternbuild/PatternBuilder.java        |  26 +---
 .../org/apache/eventmesh/api/meta/MetaService.java |   4 +-
 .../eventmesh/api/meta/MetaServiceListener.java    |  26 ++++
 .../meta/consul/service/ConsulMetaService.java     |   9 +-
 .../meta/etcd/service/EtcdMetaService.java         |   9 +-
 .../meta/nacos/service/NacosMetaService.java       |  96 +++++++++++++-
 .../nacos/service/NacosMetaServiceTest.java        |  20 ---
 .../zookeeper/service/ZookeeperMetaService.java    |   9 +-
 .../prometheus/metrics/PrometheusGrpcExporter.java |  16 +--
 .../prometheus/metrics/PrometheusHttpExporter.java | 142 ++++++++++-----------
 .../prometheus/metrics/PrometheusTcpExporter.java  |  40 +++---
 .../prometheus/utils/PrometheusExporterUtils.java  |   2 +-
 .../apache/eventmesh/openconnect/SourceWorker.java |  12 +-
 .../eventmesh/openconnect/util/CloudEventUtil.java |   2 +-
 eventmesh-runtime/build.gradle                     |   2 +
 .../eventmesh/runtime/boot/AbstractTCPServer.java  |   2 +-
 .../runtime/boot/EventMeshHTTPServer.java          |  15 +++
 .../eventmesh/runtime/boot/FilterEngine.java       | 136 ++++++++++++++++++++
 .../http/processor/SendAsyncEventProcessor.java    |  70 ++++++----
 .../protocol/http/push/AsyncHTTPPushRequest.java   |  12 ++
 .../apache/eventmesh/runtime/meta/MetaStorage.java |  17 +++
 23 files changed, 494 insertions(+), 188 deletions(-)

diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java
index d3c7b067d..5f2177862 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java
@@ -42,6 +42,8 @@ public enum EventMeshRetCode {
     EVENTMESH_HEARTBEAT_ERR(21, "eventMesh heartbeat error"),
     EVENTMESH_ACL_ERR(22, "eventMesh acl error"),
     EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR(23, "eventMesh http msg send over 
the limit"),
+
+    EVENTMESH_FILTER_MSG_ERR(24, "eventMesh filter async msg error"),
     EVENTMESH_OPERATE_FAIL(100, "operate fail");
 
     private final Integer retCode;
diff --git 
a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java 
b/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java
index 73878fb65..5a2493a37 100644
--- 
a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java
+++ 
b/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java
@@ -26,26 +26,31 @@ import com.fasterxml.jackson.databind.JsonNode;
 
 public class PatternEntry {
 
-    private final String patternPath;
+    private String patternName;
+
+    private String patternPath;
 
     private final List<Condition> conditionList = new ArrayList<>();
 
-    public PatternEntry(final String patternPath) {
+    public PatternEntry(final String patternName, final String patternPath) {
+        this.patternName = patternName;
         this.patternPath = patternPath;
     }
 
-    public void addRuleCondition(Condition patternCondition) {
+    public void addCondition(Condition patternCondition) {
         this.conditionList.add(patternCondition);
     }
 
     public String getPatternName() {
-        return "123";
+        return patternName;
     }
 
     public String getPatternPath() {
         return patternPath;
     }
 
+    // default filter type is OR
+    // todo: extend the filter type with AND
     public boolean match(JsonNode jsonElement) {
         for (final Condition patternCondition : conditionList) {
             if (patternCondition.match(jsonElement)) {
diff --git 
a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java
 
b/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java
index 0a2fb0176..5f9a71d26 100644
--- 
a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java
+++ 
b/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java
@@ -74,21 +74,7 @@ public class PatternBuilder {
             }
 
             // iter all requiredField
-            parseRequiredField("$." + key, value, pattern);
-            if (value.isEmpty()) {
-                // Empty array
-                throw new JsonException("INVALID_PATTERN_VALUE ");
-            }
-            PatternEntry patternEntry = new PatternEntry("$." + key);
-            for (final JsonNode objNode : value) {
-                // {
-                // "suffix":".jpg"
-                // }
-                Condition condition = parseCondition(objNode);
-                patternEntry.addRuleCondition(condition);
-            }
-
-            pattern.addRequiredFieldList(patternEntry);
+            parseRequiredField(key, "$." + key, value, pattern);
         }
 
         return pattern;
@@ -112,12 +98,12 @@ public class PatternBuilder {
                 String key = entry.getKey();
                 // [{"anything-but":"initializing"}] [{"anything-but":123}]}
                 JsonNode value = entry.getValue();
-                PatternEntry patternEntry = new PatternEntry(elepath + "." + 
key);
+                PatternEntry patternEntry = new PatternEntry(key, elepath + 
"." + key);
                 if (!value.isObject()) {
                     if (value.isArray()) {
                         for (JsonNode node11 : value) {
                             // {"anything-but":"initializing"}
-                            
patternEntry.addRuleCondition(parseCondition(node11));
+                            patternEntry.addCondition(parseCondition(node11));
                         }
                     }
                     pattern.addDataList(patternEntry);
@@ -146,15 +132,15 @@ public class PatternBuilder {
         return null;
     }
 
-    private static void parseRequiredField(String path, JsonNode jsonNode, 
Pattern pattern) {
+    private static void parseRequiredField(String patternName, String 
patternPath, JsonNode jsonNode, Pattern pattern) {
         if (jsonNode.isEmpty()) {
             // Empty array
             throw new JsonException("INVALID_PATTERN_VALUE ");
         }
-        PatternEntry patternEntry = new PatternEntry(path);
+        PatternEntry patternEntry = new PatternEntry(patternName, patternPath);
         for (final JsonNode objNode : jsonNode) {
             Condition condition = parseCondition(objNode);
-            patternEntry.addRuleCondition(condition);
+            patternEntry.addCondition(condition);
         }
 
         pattern.addRequiredFieldList(patternEntry);
diff --git 
a/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaService.java
 
b/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaService.java
index feb0efe5a..6dcf4fc13 100644
--- 
a/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaService.java
+++ 
b/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaService.java
@@ -54,7 +54,9 @@ public interface MetaService {
 
     void registerMetadata(Map<String, String> metadataMap);
 
-    String getMetaData(String key);
+    Map<String, String> getMetaData(String key, boolean fuzzyEnabled);
+
+    void getMetaDataWithListener(MetaServiceListener metaServiceListener, 
String key);
 
     void updateMetaData(Map<String, String> metadataMap);
 
diff --git 
a/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaServiceListener.java
 
b/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaServiceListener.java
new file mode 100644
index 000000000..b304de780
--- /dev/null
+++ 
b/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaServiceListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api.meta;
+
+/**
+ * MetaServiceListener
+ */
+public interface MetaServiceListener {
+
+    void onChange(String key, String value);
+}
diff --git 
a/eventmesh-meta/eventmesh-meta-consul/src/main/java/org/apache/eventmesh/meta/consul/service/ConsulMetaService.java
 
b/eventmesh-meta/eventmesh-meta-consul/src/main/java/org/apache/eventmesh/meta/consul/service/ConsulMetaService.java
index b2ba4564d..2d9b921ef 100644
--- 
a/eventmesh-meta/eventmesh-meta-consul/src/main/java/org/apache/eventmesh/meta/consul/service/ConsulMetaService.java
+++ 
b/eventmesh-meta/eventmesh-meta-consul/src/main/java/org/apache/eventmesh/meta/consul/service/ConsulMetaService.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.meta.consul.service;
 
 import org.apache.eventmesh.api.exception.MetaException;
 import org.apache.eventmesh.api.meta.MetaService;
+import org.apache.eventmesh.api.meta.MetaServiceListener;
 import org.apache.eventmesh.api.meta.dto.EventMeshDataInfo;
 import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo;
 import org.apache.eventmesh.api.meta.dto.EventMeshUnRegisterInfo;
@@ -162,6 +163,12 @@ public class ConsulMetaService implements MetaService {
         return true;
     }
 
+    // todo: to be implemented
+    @Override
+    public void getMetaDataWithListener(MetaServiceListener 
metaServiceListener, String key) {
+
+    }
+
     @Override
     public List<EventMeshDataInfo> findEventMeshInfoByCluster(String 
clusterName) throws MetaException {
         HealthServicesRequest request = 
HealthServicesRequest.newBuilder().setPassing(true).setToken(token).build();
@@ -192,7 +199,7 @@ public class ConsulMetaService implements MetaService {
     }
 
     @Override
-    public String getMetaData(String key) {
+    public Map<String, String> getMetaData(String key, boolean fuzzyEnabled) {
         return null;
     }
 
diff --git 
a/eventmesh-meta/eventmesh-meta-etcd/src/main/java/org/apache/eventmesh/meta/etcd/service/EtcdMetaService.java
 
b/eventmesh-meta/eventmesh-meta-etcd/src/main/java/org/apache/eventmesh/meta/etcd/service/EtcdMetaService.java
index 7f0c95507..297900cb9 100644
--- 
a/eventmesh-meta/eventmesh-meta-etcd/src/main/java/org/apache/eventmesh/meta/etcd/service/EtcdMetaService.java
+++ 
b/eventmesh-meta/eventmesh-meta-etcd/src/main/java/org/apache/eventmesh/meta/etcd/service/EtcdMetaService.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.meta.etcd.service;
 
 import org.apache.eventmesh.api.exception.MetaException;
 import org.apache.eventmesh.api.meta.MetaService;
+import org.apache.eventmesh.api.meta.MetaServiceListener;
 import org.apache.eventmesh.api.meta.dto.EventMeshDataInfo;
 import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo;
 import org.apache.eventmesh.api.meta.dto.EventMeshUnRegisterInfo;
@@ -196,10 +197,16 @@ public class EtcdMetaService implements MetaService {
     }
 
     @Override
-    public String getMetaData(String key) {
+    public Map<String, String> getMetaData(String key, boolean fuzzyEnabled) {
         return null;
     }
 
+    // todo: to be implemented
+    @Override
+    public void getMetaDataWithListener(MetaServiceListener 
metaServiceListener, String key) {
+
+    }
+
     @Override
     public void updateMetaData(Map<String, String> metadataMap) {
         String etcdMetaKey = instanceIp + "-" + group;
diff --git 
a/eventmesh-meta/eventmesh-meta-nacos/src/main/java/org/apache/eventmesh/meta/nacos/service/NacosMetaService.java
 
b/eventmesh-meta/eventmesh-meta-nacos/src/main/java/org/apache/eventmesh/meta/nacos/service/NacosMetaService.java
index 5eaac5a32..104e6a0fb 100644
--- 
a/eventmesh-meta/eventmesh-meta-nacos/src/main/java/org/apache/eventmesh/meta/nacos/service/NacosMetaService.java
+++ 
b/eventmesh-meta/eventmesh-meta-nacos/src/main/java/org/apache/eventmesh/meta/nacos/service/NacosMetaService.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.meta.nacos.service;
 
 import org.apache.eventmesh.api.exception.MetaException;
 import org.apache.eventmesh.api.meta.MetaService;
+import org.apache.eventmesh.api.meta.MetaServiceListener;
 import org.apache.eventmesh.api.meta.config.EventMeshMetaConfig;
 import org.apache.eventmesh.api.meta.dto.EventMeshDataInfo;
 import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo;
@@ -31,9 +32,18 @@ import 
org.apache.eventmesh.meta.nacos.config.NacosMetaStorageConfiguration;
 import org.apache.eventmesh.meta.nacos.constant.NacosConstant;
 
 import org.apache.commons.lang3.StringUtils;
-
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -41,16 +51,19 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.alibaba.nacos.api.NacosFactory;
 import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.api.config.listener.Listener;
 import com.alibaba.nacos.api.exception.NacosException;
 import com.alibaba.nacos.api.naming.NamingService;
 import com.alibaba.nacos.api.naming.pojo.Instance;
 import com.alibaba.nacos.client.naming.utils.UtilAndComs;
 import com.alibaba.nacos.common.utils.CollectionUtils;
 import com.alibaba.nacos.common.utils.JacksonUtils;
+import com.fasterxml.jackson.databind.JsonNode;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -85,6 +98,8 @@ public class NacosMetaService implements MetaService {
 
     private ConcurrentMap<String, EventMeshRegisterInfo> 
eventMeshRegisterInfoMap;
 
+    private MetaServiceListener metaServiceListener;
+
     @Override
     public void init() throws MetaException {
 
@@ -173,6 +188,26 @@ public class NacosMetaService implements MetaService {
         return properties;
     }
 
+    @Override
+    public void getMetaDataWithListener(MetaServiceListener 
metaServiceListener, String key) {
+        try {
+            nacosConfigService.addListener(key, group, new Listener() {
+
+                @Override
+                public Executor getExecutor() {
+                    return null;
+                }
+
+                @Override
+                public void receiveConfigInfo(String configInfo) {
+                    metaServiceListener.onChange(key, configInfo);
+                }
+            });
+        } catch (Exception e) {
+            throw new RuntimeException("add nacos listener for key " + key + 
"error", e);
+        }
+    }
+
     @Override
     public void shutdown() throws MetaException {
         if (!initStatus.compareAndSet(true, false)) {
@@ -242,16 +277,67 @@ public class NacosMetaService implements MetaService {
         }
     }
 
+    // implement with http
     @Override
-    public String getMetaData(String key) {
-        try {
-            return this.nacosConfigService.getConfig(key, group, 5000L);
-        } catch (NacosException e) {
+    public Map<String, String> getMetaData(String key, boolean fuzzyEnabled) {
+        if (fuzzyEnabled) {
+            key = key + "*";
+        }
+        int pageNo = 1;
+        int pageSize = 100;
+
+        Map<String, String> result = new HashMap<>();
+        Map<String, String> tmpMap;
+        do {
+            tmpMap = getResultFromNacos(pageNo, pageSize, key, group, 
fuzzyEnabled);
+            result.putAll(tmpMap);
+        } while (!(tmpMap.size() < pageSize));
+        return result;
+    }
+
+    private Map<String, String> getResultFromNacos(int pageNo, int pageSize, 
String key, String group, boolean fuzzyEnabled) {
+        Map<String, String> result = new HashMap<>();
+        try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+            URIBuilder uriBuilder = new URIBuilder("http://"; + serverAddr + 
"/nacos/v1/cs/configs")
+                .setParameter("dataId", key)
+                .setParameter("group", group)
+                .setParameter("pageNo", String.valueOf(pageNo))
+                .setParameter("pageSize", String.valueOf(pageSize));
+            if (fuzzyEnabled) {
+                uriBuilder.setParameter("search", "blur");
+            }
+            URI uri = uriBuilder.build();
+            HttpGet httpGet = new HttpGet(uri);
+            try (CloseableHttpResponse closeableHttpResponse = 
httpclient.execute(httpGet)) {
+                if (closeableHttpResponse.getStatusLine().getStatusCode() == 
200) {
+                    String response = 
EntityUtils.toString(closeableHttpResponse.getEntity(), StandardCharsets.UTF_8);
+                    result = processResponse(response);
+                }
+            } catch (Exception e) {
+                log.error("get metaData fail", e);
+                throw new RuntimeException(e);
+            }
+            return result;
+        } catch (Exception e) {
             log.error("get metaData fail", e);
             throw new RuntimeException(e);
         }
     }
 
+    private Map<String, String> processResponse(String response) {
+        Map<String, String> result = new HashMap<>();
+        JsonNode jsonNode = JacksonUtils.toObj(response);
+        JsonNode jsonNodeArray = jsonNode.get("pageItems");
+        if (jsonNodeArray.isArray()) {
+            for (JsonNode js : jsonNodeArray) {
+                String key = js.get("dataId").asText();
+                String value = js.get("content").asText();
+                result.put(key, value);
+            }
+        }
+        return result;
+    }
+
     @Override
     public void updateMetaData(Map<String, String> metadataMap) {
         String protocol = 
metadataMap.get(EventMeshMetaConfig.EVENT_MESH_PROTO);
diff --git 
a/eventmesh-meta/eventmesh-meta-nacos/src/test/java/org/apache/eventmesh/registry/nacos/service/NacosMetaServiceTest.java
 
b/eventmesh-meta/eventmesh-meta-nacos/src/test/java/org/apache/eventmesh/registry/nacos/service/NacosMetaServiceTest.java
index 14be0578c..1065ab4be 100644
--- 
a/eventmesh-meta/eventmesh-meta-nacos/src/test/java/org/apache/eventmesh/registry/nacos/service/NacosMetaServiceTest.java
+++ 
b/eventmesh-meta/eventmesh-meta-nacos/src/test/java/org/apache/eventmesh/registry/nacos/service/NacosMetaServiceTest.java
@@ -19,7 +19,6 @@ package org.apache.eventmesh.registry.nacos.service;
 
 import static org.apache.eventmesh.common.Constants.HTTP;
 
-import org.apache.eventmesh.api.exception.MetaException;
 import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo;
 import org.apache.eventmesh.api.meta.dto.EventMeshUnRegisterInfo;
 import org.apache.eventmesh.common.config.CommonConfiguration;
@@ -106,23 +105,4 @@ public class NacosMetaServiceTest {
         
Assertions.assertFalse((Boolean.parseBoolean(initStatusField.toString())));
         
Assertions.assertFalse((Boolean.parseBoolean(startStatusField.toString())));
     }
-
-    @Test
-    public void testRegister() {
-        Assertions.assertThrows(MetaException.class, () -> {
-            nacosMetaService.init();
-            nacosMetaService.start();
-            nacosMetaService.register(eventMeshRegisterInfo);
-        });
-    }
-
-    @Test
-    public void testUnRegister() {
-        Assertions.assertThrows(MetaException.class, () -> {
-            nacosMetaService.init();
-            nacosMetaService.start();
-            nacosMetaService.unRegister(eventMeshUnRegisterInfo);
-        });
-    }
-
 }
diff --git 
a/eventmesh-meta/eventmesh-meta-zookeeper/src/main/java/org/apache/eventmesh/meta/zookeeper/service/ZookeeperMetaService.java
 
b/eventmesh-meta/eventmesh-meta-zookeeper/src/main/java/org/apache/eventmesh/meta/zookeeper/service/ZookeeperMetaService.java
index 4dba3effe..5cee22d9c 100644
--- 
a/eventmesh-meta/eventmesh-meta-zookeeper/src/main/java/org/apache/eventmesh/meta/zookeeper/service/ZookeeperMetaService.java
+++ 
b/eventmesh-meta/eventmesh-meta-zookeeper/src/main/java/org/apache/eventmesh/meta/zookeeper/service/ZookeeperMetaService.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.meta.zookeeper.service;
 
 import org.apache.eventmesh.api.exception.MetaException;
 import org.apache.eventmesh.api.meta.MetaService;
+import org.apache.eventmesh.api.meta.MetaServiceListener;
 import org.apache.eventmesh.api.meta.dto.EventMeshDataInfo;
 import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo;
 import org.apache.eventmesh.api.meta.dto.EventMeshUnRegisterInfo;
@@ -288,10 +289,16 @@ public class ZookeeperMetaService implements MetaService {
     }
 
     @Override
-    public String getMetaData(String key) {
+    public Map<String, String> getMetaData(String key, boolean fuzzyEnabled) {
         return null;
     }
 
+    // todo: to be implemented
+    @Override
+    public void getMetaDataWithListener(MetaServiceListener 
metaServiceListener, String key) {
+
+    }
+
     @Override
     public void updateMetaData(Map<String, String> metadataMap) {
 
diff --git 
a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusGrpcExporter.java
 
b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusGrpcExporter.java
index 6d26197a2..ca61f9add 100644
--- 
a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusGrpcExporter.java
+++ 
b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusGrpcExporter.java
@@ -41,14 +41,14 @@ public class PrometheusGrpcExporter {
      * Map structure : [metric name, description of name] -> the method of get 
corresponding metric.
      */
     private final Map<String[], Function<GrpcSummaryMetrics, Number>> 
paramPairs = ImmutableMap
-            .<String[], Function<GrpcSummaryMetrics, Number>>builder()
-            .put(join("sub.topic.num", "get sub topic num."), 
GrpcSummaryMetrics::getSubscribeTopicNum)
-            .put(join("retry.queue.size", "get size of retry queue."), 
GrpcSummaryMetrics::getRetrySize)
-            .put(join("server.tps", "get size of retry queue."), 
GrpcSummaryMetrics::getClient2EventMeshTPS)
-            .put(join("client.tps", "get tps of eventMesh to mq."), 
GrpcSummaryMetrics::getEventMesh2ClientTPS)
-            .put(join("mq.provider.tps", "get tps of eventMesh to mq."), 
GrpcSummaryMetrics::getEventMesh2MqTPS)
-            .put(join("mq.consumer.tps", "get tps of eventMesh to mq."), 
GrpcSummaryMetrics::getMq2EventMeshTPS)
-            .build();
+        .<String[], Function<GrpcSummaryMetrics, Number>>builder()
+        .put(join("sub.topic.num", "get sub topic num."), 
GrpcSummaryMetrics::getSubscribeTopicNum)
+        .put(join("retry.queue.size", "get size of retry queue."), 
GrpcSummaryMetrics::getRetrySize)
+        .put(join("server.tps", "get size of retry queue."), 
GrpcSummaryMetrics::getClient2EventMeshTPS)
+        .put(join("client.tps", "get tps of eventMesh to mq."), 
GrpcSummaryMetrics::getEventMesh2ClientTPS)
+        .put(join("mq.provider.tps", "get tps of eventMesh to mq."), 
GrpcSummaryMetrics::getEventMesh2MqTPS)
+        .put(join("mq.consumer.tps", "get tps of eventMesh to mq."), 
GrpcSummaryMetrics::getMq2EventMeshTPS)
+        .build();
 
     public static void export(final String meterName, final GrpcSummaryMetrics 
summaryMetrics) {
         final Meter meter = GlobalMeterProvider.getMeter(meterName);
diff --git 
a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusHttpExporter.java
 
b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusHttpExporter.java
index fc5a65417..a09cb074d 100644
--- 
a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusHttpExporter.java
+++ 
b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusHttpExporter.java
@@ -40,81 +40,81 @@ public class PrometheusHttpExporter {
      * Map structure : [metric name, description of name] -> the method of get 
corresponding metric.
      */
     private final Map<String[], Function<HttpSummaryMetrics, Number>> 
paramPairs = ImmutableMap
-            .<String[], Function<HttpSummaryMetrics, Number>>builder()
-            // maxHTTPTPS
-            .put(join("eventmesh.http.request.tps.max", "max TPS of HTTP."), 
HttpSummaryMetrics::maxHTTPTPS)
-            // avgHTTPTPS
-            .put(join("eventmesh.http.request.tps.avg", "avg TPS of HTTP."), 
HttpSummaryMetrics::avgHTTPTPS)
-            // maxHTTPCost
-            .put(join("eventmesh.http.request.cost.max", "max cost of HTTP."), 
HttpSummaryMetrics::maxHTTPCost)
-            // avgHTTPCost
-            .put(join("eventmesh.http.request.cost.avg", "avg cost of HTTP."), 
HttpSummaryMetrics::avgHTTPCost)
-            // avgHTTPBodyDecodeCost
-            .put(join("eventmesh.http.body.decode.cost.avg", "avg body decode 
cost of HTTP."), HttpSummaryMetrics::avgHTTPBodyDecodeCost)
-            // httpDiscard
-            .put(join("eventmesh.http.request.discard.num", "http request 
discard num."), HttpSummaryMetrics::getHttpDiscard)
-            // maxBatchSendMsgTPS
-            .put(join("eventmesh.batch.send.message.tps.max", "max of batch 
send message tps."), HttpSummaryMetrics::maxSendBatchMsgTPS)
-            // avgBatchSendMsgTPS
-            .put(join("eventmesh.batch.send.message.tps.avg", "avg of batch 
send message tps."), HttpSummaryMetrics::avgSendBatchMsgTPS)
-            // sum
-            .put(join("eventmesh.batch.send.message.num", "sum of batch send 
message number."), HttpSummaryMetrics::getSendBatchMsgNumSum)
-            // sumFail
-            .put(join("eventmesh.batch.send.message.fail.num", "sum of batch 
send message fail message number."),
-                    HttpSummaryMetrics::getSendBatchMsgFailNumSum)
-            // sumFailRate
-            .put(join("eventmesh.batch.send.message.fail.rate", "send batch 
message fail rate."), HttpSummaryMetrics::getSendBatchMsgFailRate)
-            // discard
-            .put(join("eventmesh.batch.send.message.discard.num", "sum of send 
batch message discard number."),
-                    HttpSummaryMetrics::getSendBatchMsgDiscardNumSum)
-            // maxSendMsgTPS
-            .put(join("eventmesh.send.message.tps.max", "max of send message 
tps."), HttpSummaryMetrics::maxSendMsgTPS)
-            // avgSendMsgTPS
-            .put(join("eventmesh.send.message.tps.avg", "avg of send message 
tps."), HttpSummaryMetrics::avgSendMsgTPS)
-            // sum
-            .put(join("eventmesh.send.message.num", "sum of send message 
number."), HttpSummaryMetrics::getSendMsgNumSum)
-            // sumFail
-            .put(join("eventmesh.send.message.fail.num", "sum of send message 
fail number."), HttpSummaryMetrics::getSendMsgFailNumSum)
-            // sumFailRate
-            .put(join("eventmesh.send.message.fail.rate", "send message fail 
rate."), HttpSummaryMetrics::getSendMsgFailRate)
-            // replyMsg
-            .put(join("eventmesh.reply.message.num", "sum of reply message 
number."), HttpSummaryMetrics::getReplyMsgNumSum)
-            // replyFail
-            .put(join("eventmesh.reply.message.fail.num", "sum of reply 
message fail number."), HttpSummaryMetrics::getReplyMsgFailNumSum)
-            // maxPushMsgTPS
-            .put(join("eventmesh.push.message.tps.max", "max of push message 
tps."), HttpSummaryMetrics::maxPushMsgTPS)
-            // avgPushMsgTPS
-            .put(join("eventmesh.push.message.tps.avg", "avg of push message 
tps."), HttpSummaryMetrics::avgPushMsgTPS)
-            // sum
-            .put(join("eventmesh.http.push.message.num", "sum of http push 
message number."), HttpSummaryMetrics::getHttpPushMsgNumSum)
-            // sumFail
-            .put(join("eventmesh.http.push.message.fail.num", "sum of http 
push message fail number."), HttpSummaryMetrics::getHttpPushFailNumSum)
-            // sumFailRate
-            .put(join("eventmesh.http.push.message.fail.rate", "http push 
message fail rate."), HttpSummaryMetrics::getHttpPushMsgFailRate)
-            // maxClientLatency
-            .put(join("eventmesh.http.push.latency.max", "max of http push 
latency."), HttpSummaryMetrics::maxHTTPPushLatency)
-            // avgClientLatency
-            .put(join("eventmesh.http.push.latency.avg", "avg of http push 
latency."), HttpSummaryMetrics::avgHTTPPushLatency)
-            // batchMsgQ
-            .put(join("eventmesh.batch.message.queue.size", "size of batch 
message queue."), HttpSummaryMetrics::getBatchMsgQueueSize)
-            // sendMsgQ
-            .put(join("eventmesh.send.message.queue.size", "size of send 
message queue."), HttpSummaryMetrics::getSendMsgQueueSize)
-            // pushMsgQ
-            .put(join("eventmesh.push.message.queue.size", "size of push 
message queue."), HttpSummaryMetrics::getPushMsgQueueSize)
-            // httpRetryQ
-            .put(join("eventmesh.http.retry.queue.size", "size of http retry 
queue."), HttpSummaryMetrics::getHttpRetryQueueSize)
-            // batchAvgSend2MQCost
-            .put(join("eventmesh.batch.send.message.cost.avg", "avg of batch 
send message cost."), HttpSummaryMetrics::avgBatchSendMsgCost)
-            // avgSend2MQCost
-            .put(join("eventmesh.send.message.cost.avg", "avg of send message 
cost."), HttpSummaryMetrics::avgSendMsgCost)
-            // avgReply2MQCost
-            .put(join("eventmesh.reply.message.cost.avg", "avg of reply 
message cost."), HttpSummaryMetrics::avgReplyMsgCost)
-            .build();
+        .<String[], Function<HttpSummaryMetrics, Number>>builder()
+        // maxHTTPTPS
+        .put(join("eventmesh.http.request.tps.max", "max TPS of HTTP."), 
HttpSummaryMetrics::maxHTTPTPS)
+        // avgHTTPTPS
+        .put(join("eventmesh.http.request.tps.avg", "avg TPS of HTTP."), 
HttpSummaryMetrics::avgHTTPTPS)
+        // maxHTTPCost
+        .put(join("eventmesh.http.request.cost.max", "max cost of HTTP."), 
HttpSummaryMetrics::maxHTTPCost)
+        // avgHTTPCost
+        .put(join("eventmesh.http.request.cost.avg", "avg cost of HTTP."), 
HttpSummaryMetrics::avgHTTPCost)
+        // avgHTTPBodyDecodeCost
+        .put(join("eventmesh.http.body.decode.cost.avg", "avg body decode cost 
of HTTP."), HttpSummaryMetrics::avgHTTPBodyDecodeCost)
+        // httpDiscard
+        .put(join("eventmesh.http.request.discard.num", "http request discard 
num."), HttpSummaryMetrics::getHttpDiscard)
+        // maxBatchSendMsgTPS
+        .put(join("eventmesh.batch.send.message.tps.max", "max of batch send 
message tps."), HttpSummaryMetrics::maxSendBatchMsgTPS)
+        // avgBatchSendMsgTPS
+        .put(join("eventmesh.batch.send.message.tps.avg", "avg of batch send 
message tps."), HttpSummaryMetrics::avgSendBatchMsgTPS)
+        // sum
+        .put(join("eventmesh.batch.send.message.num", "sum of batch send 
message number."), HttpSummaryMetrics::getSendBatchMsgNumSum)
+        // sumFail
+        .put(join("eventmesh.batch.send.message.fail.num", "sum of batch send 
message fail message number."),
+            HttpSummaryMetrics::getSendBatchMsgFailNumSum)
+        // sumFailRate
+        .put(join("eventmesh.batch.send.message.fail.rate", "send batch 
message fail rate."), HttpSummaryMetrics::getSendBatchMsgFailRate)
+        // discard
+        .put(join("eventmesh.batch.send.message.discard.num", "sum of send 
batch message discard number."),
+            HttpSummaryMetrics::getSendBatchMsgDiscardNumSum)
+        // maxSendMsgTPS
+        .put(join("eventmesh.send.message.tps.max", "max of send message 
tps."), HttpSummaryMetrics::maxSendMsgTPS)
+        // avgSendMsgTPS
+        .put(join("eventmesh.send.message.tps.avg", "avg of send message 
tps."), HttpSummaryMetrics::avgSendMsgTPS)
+        // sum
+        .put(join("eventmesh.send.message.num", "sum of send message 
number."), HttpSummaryMetrics::getSendMsgNumSum)
+        // sumFail
+        .put(join("eventmesh.send.message.fail.num", "sum of send message fail 
number."), HttpSummaryMetrics::getSendMsgFailNumSum)
+        // sumFailRate
+        .put(join("eventmesh.send.message.fail.rate", "send message fail 
rate."), HttpSummaryMetrics::getSendMsgFailRate)
+        // replyMsg
+        .put(join("eventmesh.reply.message.num", "sum of reply message 
number."), HttpSummaryMetrics::getReplyMsgNumSum)
+        // replyFail
+        .put(join("eventmesh.reply.message.fail.num", "sum of reply message 
fail number."), HttpSummaryMetrics::getReplyMsgFailNumSum)
+        // maxPushMsgTPS
+        .put(join("eventmesh.push.message.tps.max", "max of push message 
tps."), HttpSummaryMetrics::maxPushMsgTPS)
+        // avgPushMsgTPS
+        .put(join("eventmesh.push.message.tps.avg", "avg of push message 
tps."), HttpSummaryMetrics::avgPushMsgTPS)
+        // sum
+        .put(join("eventmesh.http.push.message.num", "sum of http push message 
number."), HttpSummaryMetrics::getHttpPushMsgNumSum)
+        // sumFail
+        .put(join("eventmesh.http.push.message.fail.num", "sum of http push 
message fail number."), HttpSummaryMetrics::getHttpPushFailNumSum)
+        // sumFailRate
+        .put(join("eventmesh.http.push.message.fail.rate", "http push message 
fail rate."), HttpSummaryMetrics::getHttpPushMsgFailRate)
+        // maxClientLatency
+        .put(join("eventmesh.http.push.latency.max", "max of http push 
latency."), HttpSummaryMetrics::maxHTTPPushLatency)
+        // avgClientLatency
+        .put(join("eventmesh.http.push.latency.avg", "avg of http push 
latency."), HttpSummaryMetrics::avgHTTPPushLatency)
+        // batchMsgQ
+        .put(join("eventmesh.batch.message.queue.size", "size of batch message 
queue."), HttpSummaryMetrics::getBatchMsgQueueSize)
+        // sendMsgQ
+        .put(join("eventmesh.send.message.queue.size", "size of send message 
queue."), HttpSummaryMetrics::getSendMsgQueueSize)
+        // pushMsgQ
+        .put(join("eventmesh.push.message.queue.size", "size of push message 
queue."), HttpSummaryMetrics::getPushMsgQueueSize)
+        // httpRetryQ
+        .put(join("eventmesh.http.retry.queue.size", "size of http retry 
queue."), HttpSummaryMetrics::getHttpRetryQueueSize)
+        // batchAvgSend2MQCost
+        .put(join("eventmesh.batch.send.message.cost.avg", "avg of batch send 
message cost."), HttpSummaryMetrics::avgBatchSendMsgCost)
+        // avgSend2MQCost
+        .put(join("eventmesh.send.message.cost.avg", "avg of send message 
cost."), HttpSummaryMetrics::avgSendMsgCost)
+        // avgReply2MQCost
+        .put(join("eventmesh.reply.message.cost.avg", "avg of reply message 
cost."), HttpSummaryMetrics::avgReplyMsgCost)
+        .build();
 
     public void export(String name, HttpSummaryMetrics summaryMetrics) {
         Meter meter = GlobalMeterProvider.getMeter(name);
         paramPairs.forEach((metricInfo, getMetric) -> observeOfValue(meter, 
metricInfo[0], metricInfo[1],
-                HTTP, summaryMetrics, getMetric, HttpSummaryMetrics.class));
+            HTTP, summaryMetrics, getMetric, HttpSummaryMetrics.class));
     }
 
 }
diff --git 
a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusTcpExporter.java
 
b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusTcpExporter.java
index a8eff9de4..ff9eef9a4 100644
--- 
a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusTcpExporter.java
+++ 
b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusTcpExporter.java
@@ -41,29 +41,29 @@ public class PrometheusTcpExporter {
      * Map structure : [metric name, description of name] -> the method of get 
corresponding metric.
      */
     private final Map<String[], Function<TcpSummaryMetrics, Number>> 
paramPairs = ImmutableMap
-            .<String[], Function<TcpSummaryMetrics, Number>>builder()
-            // retryQueueSize
-            .put(join("retry.queue.size", "get size of retry queue."), 
TcpSummaryMetrics::getRetrySize)
-            // client2eventMeshTPS
-            .put(join("server.tps", "get tps of client to eventMesh."), 
TcpSummaryMetrics::getClient2eventMeshTPS)
-            // eventMesh2mqTPS
-            .put(join("mq.provider.tps", "get tps of eventMesh to mq."), 
TcpSummaryMetrics::getEventMesh2mqTPS)
-            // mq2eventMeshTPS
-            .put(join("mq.consumer.tps", "get tps of mq to eventMesh."), 
TcpSummaryMetrics::getMq2eventMeshTPS)
-            // eventMesh2clientTPS
-            .put(join("client.tps", "get tps of eventMesh to client."), 
TcpSummaryMetrics::getEventMesh2clientTPS)
-            // allTPS
-            .put(join("all.tps", "get all TPS."), TcpSummaryMetrics::getAllTPS)
-            // EventMeshTcpConnectionHandler.connections
-            .put(join("connection.num", 
"EventMeshTcpConnectionHandler.connections."), 
TcpSummaryMetrics::getAllConnections)
-            // subTopicNum
-            .put(join("sub.topic.num", "get sub topic num."), 
TcpSummaryMetrics::getSubTopicNum)
-            .build();
+        .<String[], Function<TcpSummaryMetrics, Number>>builder()
+        // retryQueueSize
+        .put(join("retry.queue.size", "get size of retry queue."), 
TcpSummaryMetrics::getRetrySize)
+        // client2eventMeshTPS
+        .put(join("server.tps", "get tps of client to eventMesh."), 
TcpSummaryMetrics::getClient2eventMeshTPS)
+        // eventMesh2mqTPS
+        .put(join("mq.provider.tps", "get tps of eventMesh to mq."), 
TcpSummaryMetrics::getEventMesh2mqTPS)
+        // mq2eventMeshTPS
+        .put(join("mq.consumer.tps", "get tps of mq to eventMesh."), 
TcpSummaryMetrics::getMq2eventMeshTPS)
+        // eventMesh2clientTPS
+        .put(join("client.tps", "get tps of eventMesh to client."), 
TcpSummaryMetrics::getEventMesh2clientTPS)
+        // allTPS
+        .put(join("all.tps", "get all TPS."), TcpSummaryMetrics::getAllTPS)
+        // EventMeshTcpConnectionHandler.connections
+        .put(join("connection.num", 
"EventMeshTcpConnectionHandler.connections."), 
TcpSummaryMetrics::getAllConnections)
+        // subTopicNum
+        .put(join("sub.topic.num", "get sub topic num."), 
TcpSummaryMetrics::getSubTopicNum)
+        .build();
 
     public void export(final String meterName, final TcpSummaryMetrics 
summaryMetrics) {
         final Meter meter = GlobalMeterProvider.getMeter(meterName);
         paramPairs.forEach(
-                (metricInfo, getMetric) -> observeOfValue(meter, 
METRICS_TCP_PREFIX + metricInfo[0], metricInfo[1],
-                        TCP, summaryMetrics, getMetric, 
TcpSummaryMetrics.class));
+            (metricInfo, getMetric) -> observeOfValue(meter, 
METRICS_TCP_PREFIX + metricInfo[0], metricInfo[1],
+                TCP, summaryMetrics, getMetric, TcpSummaryMetrics.class));
     }
 }
diff --git 
a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/utils/PrometheusExporterUtils.java
 
b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/utils/PrometheusExporterUtils.java
index 7f17606ad..1a772020c 100644
--- 
a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/utils/PrometheusExporterUtils.java
+++ 
b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/utils/PrometheusExporterUtils.java
@@ -70,7 +70,7 @@ public class PrometheusExporterUtils {
      * @return
      */
     public static String[] join(String metricName, String desc) {
-        return new String[] {metricName, desc};
+        return new String[]{metricName, desc};
     }
 
 }
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
index d95f9c189..71c3fea4d 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
@@ -237,12 +237,12 @@ public class SourceWorker implements ConnectorWorker {
         CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1();
 
         cloudEventBuilder.withId(UUID.randomUUID().toString())
-                .withSubject(config.getPubSubConfig().getSubject())
-                .withSource(URI.create("/"))
-                .withDataContentType("application/cloudevents+json")
-                .withType(CLOUD_EVENTS_PROTOCOL_NAME)
-                
.withData(Objects.requireNonNull(JsonUtils.toJSONString(connectRecord.getData())).getBytes(StandardCharsets.UTF_8))
-                .withExtension("ttl", 10000);
+            .withSubject(config.getPubSubConfig().getSubject())
+            .withSource(URI.create("/"))
+            .withDataContentType("application/cloudevents+json")
+            .withType(CLOUD_EVENTS_PROTOCOL_NAME)
+            
.withData(Objects.requireNonNull(JsonUtils.toJSONString(connectRecord.getData())).getBytes(StandardCharsets.UTF_8))
+            .withExtension("ttl", 10000);
 
         for (String key : connectRecord.getExtensions().keySet()) {
             if 
(CloudEventUtil.validateExtensionType(connectRecord.getExtensionObj(key))) {
diff --git 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
index 54be8acc0..5691d43fd 100644
--- 
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
+++ 
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
@@ -80,7 +80,7 @@ public class CloudEventUtil {
 
     public static boolean validateExtensionType(Object obj) {
         return obj instanceof String || obj instanceof Number || obj 
instanceof Boolean
-                || obj instanceof URI || obj instanceof OffsetDateTime || obj 
instanceof byte[];
+            || obj instanceof URI || obj instanceof OffsetDateTime || obj 
instanceof byte[];
     }
 
 }
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index 871ce8cbe..eef0bb2f9 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -36,6 +36,7 @@ dependencies {
     implementation "commons-validator:commons-validator"
 
     implementation project(":eventmesh-common")
+    implementation project(":eventmesh-filter")
     implementation project(":eventmesh-spi")
     implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
     implementation 
project(":eventmesh-storage-plugin:eventmesh-storage-standalone")
@@ -44,6 +45,7 @@ dependencies {
     implementation project(":eventmesh-security-plugin:eventmesh-security-acl")
     implementation 
project(":eventmesh-security-plugin:eventmesh-security-auth-http-basic")
     implementation 
project(":eventmesh-security-plugin:eventmesh-security-auth-token")
+    implementation project(":eventmesh-transformer")
     implementation project(":eventmesh-meta:eventmesh-meta-api")
     implementation project(":eventmesh-meta:eventmesh-meta-nacos")
     implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
index 81927d6f4..be19e3d63 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
@@ -289,7 +289,7 @@ public class AbstractTCPServer extends 
AbstractRemotingServer {
         }
 
         private void processTcpCommandRequest(final Package pkg, final 
ChannelHandlerContext ctx,
-                                              final long startTime, final 
Command cmd) {
+            final long startTime, final Command cmd) {
 
             Pair<TcpProcessor, ThreadPoolExecutor> pair = 
tcpRequestProcessorTable.get(cmd);
             pair.getObject2().submit(() -> {
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index cff5a6e69..dc51f084a 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -82,6 +82,7 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
     private final EventMeshHTTPConfiguration eventMeshHttpConfiguration;
 
     private final MetaStorage metaStorage;
+
     private final Acl acl;
     private final EventBus eventBus = new EventBus();
 
@@ -89,6 +90,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
     private ProducerManager producerManager;
     private SubscriptionManager subscriptionManager;
 
+    private FilterEngine filterEngine;
+
     private HttpRetryer httpRetryer;
 
     private transient RateLimiter msgRateLimiter;
@@ -132,6 +135,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer 
{
         producerManager = new ProducerManager(this);
         producerManager.init();
 
+        filterEngine = new FilterEngine(metaStorage, producerManager, 
consumerManager);
+
         super.setHandlerService(new HandlerService());
         super.getHandlerService().setMetrics(this.getMetrics());
 
@@ -155,6 +160,10 @@ public class EventMeshHTTPServer extends 
AbstractHTTPServer {
         consumerManager.start();
         producerManager.start();
         httpRetryer.start();
+        // filterEngine depend on metaStorage
+        if (metaStorage.getStarted().get()) {
+            filterEngine.start();
+        }
 
         if (eventMeshHttpConfiguration.isEventMeshServerMetaStorageEnable()) {
             this.register();
@@ -169,6 +178,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer 
{
 
         this.getMetrics().shutdown();
 
+        filterEngine.shutdown();
+
         consumerManager.shutdown();
 
         httpClientPool.shutdown();
@@ -339,6 +350,10 @@ public class EventMeshHTTPServer extends 
AbstractHTTPServer {
         return batchRateLimiter;
     }
 
+    public FilterEngine getFilterEngine() {
+        return filterEngine;
+    }
+
     public MetaStorage getMetaStorage() {
         return metaStorage;
     }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java
new file mode 100644
index 000000000..2053e5d13
--- /dev/null
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.boot;
+
+import org.apache.eventmesh.api.meta.MetaServiceListener;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.common.utils.LogUtils;
+import org.apache.eventmesh.filter.pattern.Pattern;
+import org.apache.eventmesh.filter.patternbuild.PatternBuilder;
+import 
org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager;
+import 
org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
+import org.apache.eventmesh.runtime.meta.MetaStorage;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class FilterEngine {
+
+    /**
+     * key:group-topic
+     **/
+    private final Map<String, Pattern> filterPatternMap = new HashMap<>();
+
+    private final String filterPrefix = "filter-";
+
+    private final MetaStorage metaStorage;
+
+    private MetaServiceListener metaServiceListener;
+
+    private final ProducerManager producerManager;
+
+    private final ConsumerManager consumerManager;
+
+    private final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+
+    public FilterEngine(MetaStorage metaStorage, ProducerManager 
producerManager, ConsumerManager consumerManager) {
+        this.metaStorage = metaStorage;
+        this.producerManager = producerManager;
+        this.consumerManager = consumerManager;
+    }
+
+    public void start() {
+        Map<String, String> filterMetaData = 
metaStorage.getMetaData(filterPrefix, true);
+        for (Entry<String, String> filterDataEntry : 
filterMetaData.entrySet()) {
+            // filter-group
+            String key = filterDataEntry.getKey();
+            // topic-filterRule list
+            String value = filterDataEntry.getValue();
+            updateFilterPatternMap(key, value);
+        }
+        metaServiceListener = this::updateFilterPatternMap;
+
+        // addListeners for producerManager & consumerManager
+        scheduledExecutorService.scheduleAtFixedRate(() -> {
+            ConcurrentHashMap<String, EventMeshProducer> producerMap = 
producerManager.getProducerTable();
+            for (String producerGroup : producerMap.keySet()) {
+                for (String filterKey : filterPatternMap.keySet()) {
+                    if (!StringUtils.contains(filterKey, producerGroup)) {
+                        addFilterListener(producerGroup);
+                        LogUtils.info(log, "addFilterListener for producer 
group: " + producerGroup);
+                    }
+                }
+            }
+            ConcurrentHashMap<String, ConsumerGroupManager> consumerMap = 
consumerManager.getClientTable();
+            for (String consumerGroup : consumerMap.keySet()) {
+                for (String filterKey : filterPatternMap.keySet()) {
+                    if (!StringUtils.contains(filterKey, consumerGroup)) {
+                        addFilterListener(consumerGroup);
+                        LogUtils.info(log, "addFilterListener for consumer 
group: " + consumerGroup);
+                    }
+                }
+            }
+        }, 10_000, 5_000, TimeUnit.MILLISECONDS);
+    }
+
+    private void updateFilterPatternMap(String key, String value) {
+        String group = StringUtils.substringAfter(key, filterPrefix);
+
+        JsonNode filterJsonNodeArray = JsonUtils.getJsonNode(value);
+        if (filterJsonNodeArray != null) {
+            for (JsonNode filterJsonNode : filterJsonNodeArray) {
+                String topic = filterJsonNode.get("topic").asText();
+                String filterCondition = 
filterJsonNode.get("condition").toString();
+                Pattern filterPattern = PatternBuilder.build(filterCondition);
+                filterPatternMap.put(group + "-" + topic, filterPattern);
+            }
+        }
+        addFilterListener(group);
+    }
+
+    public void addFilterListener(String group) {
+        String filterKey = filterPrefix + group;
+        try {
+            metaStorage.getMetaDataWithListener(metaServiceListener, 
filterKey);
+        } catch (Exception e) {
+            throw new RuntimeException("addFilterListener exception", e);
+        }
+    }
+
+    public void shutdown() {
+        scheduledExecutorService.shutdown();
+    }
+
+    public Pattern getFilterPattern(String key) {
+        return filterPatternMap.get(key);
+    }
+}
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
index cb96e5433..52f740151 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
@@ -29,8 +29,10 @@ import 
org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
 import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
 import org.apache.eventmesh.common.protocol.http.common.RequestURI;
 import org.apache.eventmesh.common.utils.IPUtils;
+import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.common.utils.LogUtils;
 import org.apache.eventmesh.common.utils.RandomStringUtils;
+import org.apache.eventmesh.filter.pattern.Pattern;
 import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
 import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
 import org.apache.eventmesh.runtime.acl.Acl;
@@ -158,6 +160,8 @@ public class SendAsyncEventProcessor implements 
AsyncHttpProcessor {
             
event.getExtension(ProtocolKey.ClientInstanceKey.PRODUCERGROUP.getKey())).toString();
         final String topic = event.getSubject();
 
+        Pattern filterPattern = 
eventMeshHTTPServer.getFilterEngine().getFilterPattern(producerGroup + "-" + 
topic);
+
         // validate body
         if (StringUtils.isAnyBlank(bizNo, uniqueId, producerGroup, topic)
             || event.getData() == null) {
@@ -237,41 +241,53 @@ public class SendAsyncEventProcessor implements 
AsyncHttpProcessor {
         eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordSendMsg();
 
         final long startTime = System.currentTimeMillis();
-
+        boolean isFiltered = true;
         try {
             event = CloudEventBuilder.from(sendMessageContext.getEvent())
                 .withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, 
String.valueOf(System.currentTimeMillis()))
                 .build();
             
handlerSpecific.getTraceOperation().createClientTraceOperation(EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(),
 event),
                 EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_CLIENT_SPAN, 
false);
+            if (filterPattern != null) {
+                isFiltered = 
filterPattern.filter(JsonUtils.toJSONString(event));
+            }
 
-            eventMeshProducer.send(sendMessageContext, new SendCallback() {
-
-                @Override
-                public void onSuccess(final SendResult sendResult) {
-                    responseBodyMap.put(EventMeshConstants.RET_CODE, 
EventMeshRetCode.SUCCESS.getRetCode());
-                    responseBodyMap.put(EventMeshConstants.RET_MSG, 
EventMeshRetCode.SUCCESS.getErrMsg() + sendResult);
-
-                    LogUtils.info(log, 
"message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
-                        System.currentTimeMillis() - startTime, topic, bizNo, 
uniqueId);
-                    
handlerSpecific.getTraceOperation().endLatestTrace(sendMessageContext.getEvent());
-                    handlerSpecific.sendResponse(responseHeaderMap, 
responseBodyMap);
-                }
+            if (isFiltered) {
+                eventMeshProducer.send(sendMessageContext, new SendCallback() {
+
+                    @Override
+                    public void onSuccess(final SendResult sendResult) {
+                        responseBodyMap.put(EventMeshConstants.RET_CODE, 
EventMeshRetCode.SUCCESS.getRetCode());
+                        responseBodyMap.put(EventMeshConstants.RET_MSG, 
EventMeshRetCode.SUCCESS.getErrMsg() + sendResult);
+
+                        LogUtils.info(log, 
"message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
+                            System.currentTimeMillis() - startTime, topic, 
bizNo, uniqueId);
+                        
handlerSpecific.getTraceOperation().endLatestTrace(sendMessageContext.getEvent());
+                        handlerSpecific.sendResponse(responseHeaderMap, 
responseBodyMap);
+                    }
+
+                    @Override
+                    public void onException(final OnExceptionContext context) {
+                        responseBodyMap.put(EventMeshConstants.RET_CODE, 
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode());
+                        responseBodyMap.put(EventMeshConstants.RET_MSG, 
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
+                            + EventMeshUtil.stackTrace(context.getException(), 
2));
+                        
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, 
TimeUnit.SECONDS);
+                        
handlerSpecific.getTraceOperation().exceptionLatestTrace(context.getException(),
+                            
EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), 
sendMessageContext.getEvent()));
+
+                        handlerSpecific.sendResponse(responseHeaderMap, 
responseBodyMap);
+                        LogUtils.error(log, 
"message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
+                            System.currentTimeMillis() - startTime, topic, 
bizNo, uniqueId, context.getException());
+                    }
+                });
+            } else {
+                LogUtils.error(log, 
"message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}|apply
 filter failed",
+                    System.currentTimeMillis() - startTime, topic, bizNo, 
uniqueId);
+                
handlerSpecific.getTraceOperation().endLatestTrace(sendMessageContext.getEvent());
+                
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_FILTER_MSG_ERR, 
responseHeaderMap, responseBodyMap,
+                    
EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
+            }
 
-                @Override
-                public void onException(final OnExceptionContext context) {
-                    responseBodyMap.put(EventMeshConstants.RET_CODE, 
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode());
-                    responseBodyMap.put(EventMeshConstants.RET_MSG, 
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
-                        + EventMeshUtil.stackTrace(context.getException(), 2));
-                    
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, 
TimeUnit.SECONDS);
-                    
handlerSpecific.getTraceOperation().exceptionLatestTrace(context.getException(),
-                        
EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), 
sendMessageContext.getEvent()));
-
-                    handlerSpecific.sendResponse(responseHeaderMap, 
responseBodyMap);
-                    LogUtils.error(log, 
"message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
-                        System.currentTimeMillis() - startTime, topic, bizNo, 
uniqueId, context.getException());
-                }
-            });
         } catch (Exception ex) {
             
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, 
TimeUnit.SECONDS);
             
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR,
 responseHeaderMap, responseBodyMap, null);
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index 9cf01cd48..bee14c992 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -31,6 +31,7 @@ import org.apache.eventmesh.common.utils.IPUtils;
 import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.common.utils.LogUtils;
 import org.apache.eventmesh.common.utils.RandomStringUtils;
+import org.apache.eventmesh.filter.pattern.Pattern;
 import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
 import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
 import org.apache.eventmesh.runtime.constants.EventMeshConstants;
@@ -126,6 +127,17 @@ public class AsyncHTTPPushRequest extends 
AbstractHTTPPushRequest {
             .withExtension(EventMeshConstants.RSP_URL, currPushUrl)
             .withExtension(EventMeshConstants.RSP_GROUP, 
handleMsgContext.getConsumerGroup())
             .build();
+
+        Pattern filterPattern = eventMeshHTTPServer.getFilterEngine()
+            .getFilterPattern(handleMsgContext.getConsumerGroup() + "-" + 
handleMsgContext.getTopic());
+        if (filterPattern != null) {
+            if (!filterPattern.filter(JsonUtils.toJSONString(event))) {
+                LOGGER.error("apply filter failed, group:{}, topic:{}, 
bizSeqNo={}, uniqueId={}",
+                    this.handleMsgContext.getConsumerGroup(),
+                    this.handleMsgContext.getTopic(), 
this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId());
+                return;
+            }
+        }
         handleMsgContext.setEvent(event);
         super.setEvent(event);
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/meta/MetaStorage.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/meta/MetaStorage.java
index c51473253..41da6994f 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/meta/MetaStorage.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/meta/MetaStorage.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.runtime.meta;
 
 import org.apache.eventmesh.api.exception.MetaException;
 import org.apache.eventmesh.api.meta.MetaService;
+import org.apache.eventmesh.api.meta.MetaServiceListener;
 import org.apache.eventmesh.api.meta.bo.EventMeshAppSubTopicInfo;
 import org.apache.eventmesh.api.meta.bo.EventMeshServicePubTopicInfo;
 import org.apache.eventmesh.api.meta.dto.EventMeshDataInfo;
@@ -128,4 +129,20 @@ public class MetaStorage {
     public EventMeshAppSubTopicInfo findEventMeshAppSubTopicInfo(String group) 
throws Exception {
         return metaService.findEventMeshAppSubTopicInfoByGroup(group);
     }
+
+    public Map<String, String> getMetaData(String key, boolean fuzzyEnabled) {
+        return metaService.getMetaData(key, fuzzyEnabled);
+    }
+
+    public void getMetaDataWithListener(MetaServiceListener 
metaServiceListener, String key) throws Exception {
+        metaService.getMetaDataWithListener(metaServiceListener, key);
+    }
+
+    public AtomicBoolean getInited() {
+        return inited;
+    }
+
+    public AtomicBoolean getStarted() {
+        return started;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to