This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 2fcd09118 [ISSUE #4577] Implement FilterEngine for EventMesh Filters
(#4578)
2fcd09118 is described below
commit 2fcd091189b8ec97ff33e5c1fda0d785f9b5ad76
Author: mike_xwm <[email protected]>
AuthorDate: Tue Nov 28 11:31:35 2023 +0800
[ISSUE #4577] Implement FilterEngine for EventMesh Filters (#4578)
* update ci.yml
* add filters
* add filters
* add filters
* apply filters under http processor
* update filterEngine
* [ISSUE #4577] Implement FilterEngine for EventMesh Filters
* refactor ci.yml
* fix ci check error
* fix ci check error
* fix ci check error
* fix ci check error
* remove duplicate method in PatternBuilder
---
.../protocol/http/common/EventMeshRetCode.java | 2 +
.../org/apache/eventmesh/filter/PatternEntry.java | 13 +-
.../filter/patternbuild/PatternBuilder.java | 26 +---
.../org/apache/eventmesh/api/meta/MetaService.java | 4 +-
.../eventmesh/api/meta/MetaServiceListener.java | 26 ++++
.../meta/consul/service/ConsulMetaService.java | 9 +-
.../meta/etcd/service/EtcdMetaService.java | 9 +-
.../meta/nacos/service/NacosMetaService.java | 96 +++++++++++++-
.../nacos/service/NacosMetaServiceTest.java | 20 ---
.../zookeeper/service/ZookeeperMetaService.java | 9 +-
.../prometheus/metrics/PrometheusGrpcExporter.java | 16 +--
.../prometheus/metrics/PrometheusHttpExporter.java | 142 ++++++++++-----------
.../prometheus/metrics/PrometheusTcpExporter.java | 40 +++---
.../prometheus/utils/PrometheusExporterUtils.java | 2 +-
.../apache/eventmesh/openconnect/SourceWorker.java | 12 +-
.../eventmesh/openconnect/util/CloudEventUtil.java | 2 +-
eventmesh-runtime/build.gradle | 2 +
.../eventmesh/runtime/boot/AbstractTCPServer.java | 2 +-
.../runtime/boot/EventMeshHTTPServer.java | 15 +++
.../eventmesh/runtime/boot/FilterEngine.java | 136 ++++++++++++++++++++
.../http/processor/SendAsyncEventProcessor.java | 70 ++++++----
.../protocol/http/push/AsyncHTTPPushRequest.java | 12 ++
.../apache/eventmesh/runtime/meta/MetaStorage.java | 17 +++
23 files changed, 494 insertions(+), 188 deletions(-)
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java
index d3c7b067d..5f2177862 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/protocol/http/common/EventMeshRetCode.java
@@ -42,6 +42,8 @@ public enum EventMeshRetCode {
EVENTMESH_HEARTBEAT_ERR(21, "eventMesh heartbeat error"),
EVENTMESH_ACL_ERR(22, "eventMesh acl error"),
EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR(23, "eventMesh http msg send over
the limit"),
+
+ EVENTMESH_FILTER_MSG_ERR(24, "eventMesh filter async msg error"),
EVENTMESH_OPERATE_FAIL(100, "operate fail");
private final Integer retCode;
diff --git
a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java
b/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java
index 73878fb65..5a2493a37 100644
---
a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java
+++
b/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/PatternEntry.java
@@ -26,26 +26,31 @@ import com.fasterxml.jackson.databind.JsonNode;
public class PatternEntry {
- private final String patternPath;
+ private String patternName;
+
+ private String patternPath;
private final List<Condition> conditionList = new ArrayList<>();
- public PatternEntry(final String patternPath) {
+ public PatternEntry(final String patternName, final String patternPath) {
+ this.patternName = patternName;
this.patternPath = patternPath;
}
- public void addRuleCondition(Condition patternCondition) {
+ public void addCondition(Condition patternCondition) {
this.conditionList.add(patternCondition);
}
public String getPatternName() {
- return "123";
+ return patternName;
}
public String getPatternPath() {
return patternPath;
}
+ // default filter type is OR
+ // todo: extend the filter type with AND
public boolean match(JsonNode jsonElement) {
for (final Condition patternCondition : conditionList) {
if (patternCondition.match(jsonElement)) {
diff --git
a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java
b/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java
index 0a2fb0176..5f9a71d26 100644
---
a/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java
+++
b/eventmesh-filter/src/main/java/org/apache/eventmesh/filter/patternbuild/PatternBuilder.java
@@ -74,21 +74,7 @@ public class PatternBuilder {
}
// iter all requiredField
- parseRequiredField("$." + key, value, pattern);
- if (value.isEmpty()) {
- // Empty array
- throw new JsonException("INVALID_PATTERN_VALUE ");
- }
- PatternEntry patternEntry = new PatternEntry("$." + key);
- for (final JsonNode objNode : value) {
- // {
- // "suffix":".jpg"
- // }
- Condition condition = parseCondition(objNode);
- patternEntry.addRuleCondition(condition);
- }
-
- pattern.addRequiredFieldList(patternEntry);
+ parseRequiredField(key, "$." + key, value, pattern);
}
return pattern;
@@ -112,12 +98,12 @@ public class PatternBuilder {
String key = entry.getKey();
// [{"anything-but":"initializing"}] [{"anything-but":123}]}
JsonNode value = entry.getValue();
- PatternEntry patternEntry = new PatternEntry(elepath + "." +
key);
+ PatternEntry patternEntry = new PatternEntry(key, elepath +
"." + key);
if (!value.isObject()) {
if (value.isArray()) {
for (JsonNode node11 : value) {
// {"anything-but":"initializing"}
-
patternEntry.addRuleCondition(parseCondition(node11));
+ patternEntry.addCondition(parseCondition(node11));
}
}
pattern.addDataList(patternEntry);
@@ -146,15 +132,15 @@ public class PatternBuilder {
return null;
}
- private static void parseRequiredField(String path, JsonNode jsonNode,
Pattern pattern) {
+ private static void parseRequiredField(String patternName, String
patternPath, JsonNode jsonNode, Pattern pattern) {
if (jsonNode.isEmpty()) {
// Empty array
throw new JsonException("INVALID_PATTERN_VALUE ");
}
- PatternEntry patternEntry = new PatternEntry(path);
+ PatternEntry patternEntry = new PatternEntry(patternName, patternPath);
for (final JsonNode objNode : jsonNode) {
Condition condition = parseCondition(objNode);
- patternEntry.addRuleCondition(condition);
+ patternEntry.addCondition(condition);
}
pattern.addRequiredFieldList(patternEntry);
diff --git
a/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaService.java
b/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaService.java
index feb0efe5a..6dcf4fc13 100644
---
a/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaService.java
+++
b/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaService.java
@@ -54,7 +54,9 @@ public interface MetaService {
void registerMetadata(Map<String, String> metadataMap);
- String getMetaData(String key);
+ Map<String, String> getMetaData(String key, boolean fuzzyEnabled);
+
+ void getMetaDataWithListener(MetaServiceListener metaServiceListener,
String key);
void updateMetaData(Map<String, String> metadataMap);
diff --git
a/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaServiceListener.java
b/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaServiceListener.java
new file mode 100644
index 000000000..b304de780
--- /dev/null
+++
b/eventmesh-meta/eventmesh-meta-api/src/main/java/org/apache/eventmesh/api/meta/MetaServiceListener.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.api.meta;
+
+/**
+ * MetaServiceListener
+ */
+public interface MetaServiceListener {
+
+ void onChange(String key, String value);
+}
diff --git
a/eventmesh-meta/eventmesh-meta-consul/src/main/java/org/apache/eventmesh/meta/consul/service/ConsulMetaService.java
b/eventmesh-meta/eventmesh-meta-consul/src/main/java/org/apache/eventmesh/meta/consul/service/ConsulMetaService.java
index b2ba4564d..2d9b921ef 100644
---
a/eventmesh-meta/eventmesh-meta-consul/src/main/java/org/apache/eventmesh/meta/consul/service/ConsulMetaService.java
+++
b/eventmesh-meta/eventmesh-meta-consul/src/main/java/org/apache/eventmesh/meta/consul/service/ConsulMetaService.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.meta.consul.service;
import org.apache.eventmesh.api.exception.MetaException;
import org.apache.eventmesh.api.meta.MetaService;
+import org.apache.eventmesh.api.meta.MetaServiceListener;
import org.apache.eventmesh.api.meta.dto.EventMeshDataInfo;
import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo;
import org.apache.eventmesh.api.meta.dto.EventMeshUnRegisterInfo;
@@ -162,6 +163,12 @@ public class ConsulMetaService implements MetaService {
return true;
}
+ // todo: to be implemented
+ @Override
+ public void getMetaDataWithListener(MetaServiceListener
metaServiceListener, String key) {
+
+ }
+
@Override
public List<EventMeshDataInfo> findEventMeshInfoByCluster(String
clusterName) throws MetaException {
HealthServicesRequest request =
HealthServicesRequest.newBuilder().setPassing(true).setToken(token).build();
@@ -192,7 +199,7 @@ public class ConsulMetaService implements MetaService {
}
@Override
- public String getMetaData(String key) {
+ public Map<String, String> getMetaData(String key, boolean fuzzyEnabled) {
return null;
}
diff --git
a/eventmesh-meta/eventmesh-meta-etcd/src/main/java/org/apache/eventmesh/meta/etcd/service/EtcdMetaService.java
b/eventmesh-meta/eventmesh-meta-etcd/src/main/java/org/apache/eventmesh/meta/etcd/service/EtcdMetaService.java
index 7f0c95507..297900cb9 100644
---
a/eventmesh-meta/eventmesh-meta-etcd/src/main/java/org/apache/eventmesh/meta/etcd/service/EtcdMetaService.java
+++
b/eventmesh-meta/eventmesh-meta-etcd/src/main/java/org/apache/eventmesh/meta/etcd/service/EtcdMetaService.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.meta.etcd.service;
import org.apache.eventmesh.api.exception.MetaException;
import org.apache.eventmesh.api.meta.MetaService;
+import org.apache.eventmesh.api.meta.MetaServiceListener;
import org.apache.eventmesh.api.meta.dto.EventMeshDataInfo;
import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo;
import org.apache.eventmesh.api.meta.dto.EventMeshUnRegisterInfo;
@@ -196,10 +197,16 @@ public class EtcdMetaService implements MetaService {
}
@Override
- public String getMetaData(String key) {
+ public Map<String, String> getMetaData(String key, boolean fuzzyEnabled) {
return null;
}
+ // todo: to be implemented
+ @Override
+ public void getMetaDataWithListener(MetaServiceListener
metaServiceListener, String key) {
+
+ }
+
@Override
public void updateMetaData(Map<String, String> metadataMap) {
String etcdMetaKey = instanceIp + "-" + group;
diff --git
a/eventmesh-meta/eventmesh-meta-nacos/src/main/java/org/apache/eventmesh/meta/nacos/service/NacosMetaService.java
b/eventmesh-meta/eventmesh-meta-nacos/src/main/java/org/apache/eventmesh/meta/nacos/service/NacosMetaService.java
index 5eaac5a32..104e6a0fb 100644
---
a/eventmesh-meta/eventmesh-meta-nacos/src/main/java/org/apache/eventmesh/meta/nacos/service/NacosMetaService.java
+++
b/eventmesh-meta/eventmesh-meta-nacos/src/main/java/org/apache/eventmesh/meta/nacos/service/NacosMetaService.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.meta.nacos.service;
import org.apache.eventmesh.api.exception.MetaException;
import org.apache.eventmesh.api.meta.MetaService;
+import org.apache.eventmesh.api.meta.MetaServiceListener;
import org.apache.eventmesh.api.meta.config.EventMeshMetaConfig;
import org.apache.eventmesh.api.meta.dto.EventMeshDataInfo;
import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo;
@@ -31,9 +32,18 @@ import
org.apache.eventmesh.meta.nacos.config.NacosMetaStorageConfiguration;
import org.apache.eventmesh.meta.nacos.constant.NacosConstant;
import org.apache.commons.lang3.StringUtils;
-
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -41,16 +51,19 @@ import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
+import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.JacksonUtils;
+import com.fasterxml.jackson.databind.JsonNode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -85,6 +98,8 @@ public class NacosMetaService implements MetaService {
private ConcurrentMap<String, EventMeshRegisterInfo>
eventMeshRegisterInfoMap;
+ private MetaServiceListener metaServiceListener;
+
@Override
public void init() throws MetaException {
@@ -173,6 +188,26 @@ public class NacosMetaService implements MetaService {
return properties;
}
+ @Override
+ public void getMetaDataWithListener(MetaServiceListener
metaServiceListener, String key) {
+ try {
+ nacosConfigService.addListener(key, group, new Listener() {
+
+ @Override
+ public Executor getExecutor() {
+ return null;
+ }
+
+ @Override
+ public void receiveConfigInfo(String configInfo) {
+ metaServiceListener.onChange(key, configInfo);
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("add nacos listener for key " + key +
"error", e);
+ }
+ }
+
@Override
public void shutdown() throws MetaException {
if (!initStatus.compareAndSet(true, false)) {
@@ -242,16 +277,67 @@ public class NacosMetaService implements MetaService {
}
}
+ // implement with http
@Override
- public String getMetaData(String key) {
- try {
- return this.nacosConfigService.getConfig(key, group, 5000L);
- } catch (NacosException e) {
+ public Map<String, String> getMetaData(String key, boolean fuzzyEnabled) {
+ if (fuzzyEnabled) {
+ key = key + "*";
+ }
+ int pageNo = 1;
+ int pageSize = 100;
+
+ Map<String, String> result = new HashMap<>();
+ Map<String, String> tmpMap;
+ do {
+ tmpMap = getResultFromNacos(pageNo, pageSize, key, group,
fuzzyEnabled);
+ result.putAll(tmpMap);
+ } while (!(tmpMap.size() < pageSize));
+ return result;
+ }
+
+ private Map<String, String> getResultFromNacos(int pageNo, int pageSize,
String key, String group, boolean fuzzyEnabled) {
+ Map<String, String> result = new HashMap<>();
+ try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
+ URIBuilder uriBuilder = new URIBuilder("http://" + serverAddr +
"/nacos/v1/cs/configs")
+ .setParameter("dataId", key)
+ .setParameter("group", group)
+ .setParameter("pageNo", String.valueOf(pageNo))
+ .setParameter("pageSize", String.valueOf(pageSize));
+ if (fuzzyEnabled) {
+ uriBuilder.setParameter("search", "blur");
+ }
+ URI uri = uriBuilder.build();
+ HttpGet httpGet = new HttpGet(uri);
+ try (CloseableHttpResponse closeableHttpResponse =
httpclient.execute(httpGet)) {
+ if (closeableHttpResponse.getStatusLine().getStatusCode() ==
200) {
+ String response =
EntityUtils.toString(closeableHttpResponse.getEntity(), StandardCharsets.UTF_8);
+ result = processResponse(response);
+ }
+ } catch (Exception e) {
+ log.error("get metaData fail", e);
+ throw new RuntimeException(e);
+ }
+ return result;
+ } catch (Exception e) {
log.error("get metaData fail", e);
throw new RuntimeException(e);
}
}
+ private Map<String, String> processResponse(String response) {
+ Map<String, String> result = new HashMap<>();
+ JsonNode jsonNode = JacksonUtils.toObj(response);
+ JsonNode jsonNodeArray = jsonNode.get("pageItems");
+ if (jsonNodeArray.isArray()) {
+ for (JsonNode js : jsonNodeArray) {
+ String key = js.get("dataId").asText();
+ String value = js.get("content").asText();
+ result.put(key, value);
+ }
+ }
+ return result;
+ }
+
@Override
public void updateMetaData(Map<String, String> metadataMap) {
String protocol =
metadataMap.get(EventMeshMetaConfig.EVENT_MESH_PROTO);
diff --git
a/eventmesh-meta/eventmesh-meta-nacos/src/test/java/org/apache/eventmesh/registry/nacos/service/NacosMetaServiceTest.java
b/eventmesh-meta/eventmesh-meta-nacos/src/test/java/org/apache/eventmesh/registry/nacos/service/NacosMetaServiceTest.java
index 14be0578c..1065ab4be 100644
---
a/eventmesh-meta/eventmesh-meta-nacos/src/test/java/org/apache/eventmesh/registry/nacos/service/NacosMetaServiceTest.java
+++
b/eventmesh-meta/eventmesh-meta-nacos/src/test/java/org/apache/eventmesh/registry/nacos/service/NacosMetaServiceTest.java
@@ -19,7 +19,6 @@ package org.apache.eventmesh.registry.nacos.service;
import static org.apache.eventmesh.common.Constants.HTTP;
-import org.apache.eventmesh.api.exception.MetaException;
import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo;
import org.apache.eventmesh.api.meta.dto.EventMeshUnRegisterInfo;
import org.apache.eventmesh.common.config.CommonConfiguration;
@@ -106,23 +105,4 @@ public class NacosMetaServiceTest {
Assertions.assertFalse((Boolean.parseBoolean(initStatusField.toString())));
Assertions.assertFalse((Boolean.parseBoolean(startStatusField.toString())));
}
-
- @Test
- public void testRegister() {
- Assertions.assertThrows(MetaException.class, () -> {
- nacosMetaService.init();
- nacosMetaService.start();
- nacosMetaService.register(eventMeshRegisterInfo);
- });
- }
-
- @Test
- public void testUnRegister() {
- Assertions.assertThrows(MetaException.class, () -> {
- nacosMetaService.init();
- nacosMetaService.start();
- nacosMetaService.unRegister(eventMeshUnRegisterInfo);
- });
- }
-
}
diff --git
a/eventmesh-meta/eventmesh-meta-zookeeper/src/main/java/org/apache/eventmesh/meta/zookeeper/service/ZookeeperMetaService.java
b/eventmesh-meta/eventmesh-meta-zookeeper/src/main/java/org/apache/eventmesh/meta/zookeeper/service/ZookeeperMetaService.java
index 4dba3effe..5cee22d9c 100644
---
a/eventmesh-meta/eventmesh-meta-zookeeper/src/main/java/org/apache/eventmesh/meta/zookeeper/service/ZookeeperMetaService.java
+++
b/eventmesh-meta/eventmesh-meta-zookeeper/src/main/java/org/apache/eventmesh/meta/zookeeper/service/ZookeeperMetaService.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.meta.zookeeper.service;
import org.apache.eventmesh.api.exception.MetaException;
import org.apache.eventmesh.api.meta.MetaService;
+import org.apache.eventmesh.api.meta.MetaServiceListener;
import org.apache.eventmesh.api.meta.dto.EventMeshDataInfo;
import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo;
import org.apache.eventmesh.api.meta.dto.EventMeshUnRegisterInfo;
@@ -288,10 +289,16 @@ public class ZookeeperMetaService implements MetaService {
}
@Override
- public String getMetaData(String key) {
+ public Map<String, String> getMetaData(String key, boolean fuzzyEnabled) {
return null;
}
+ // todo: to be implemented
+ @Override
+ public void getMetaDataWithListener(MetaServiceListener
metaServiceListener, String key) {
+
+ }
+
@Override
public void updateMetaData(Map<String, String> metadataMap) {
diff --git
a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusGrpcExporter.java
b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusGrpcExporter.java
index 6d26197a2..ca61f9add 100644
---
a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusGrpcExporter.java
+++
b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusGrpcExporter.java
@@ -41,14 +41,14 @@ public class PrometheusGrpcExporter {
* Map structure : [metric name, description of name] -> the method of get
corresponding metric.
*/
private final Map<String[], Function<GrpcSummaryMetrics, Number>>
paramPairs = ImmutableMap
- .<String[], Function<GrpcSummaryMetrics, Number>>builder()
- .put(join("sub.topic.num", "get sub topic num."),
GrpcSummaryMetrics::getSubscribeTopicNum)
- .put(join("retry.queue.size", "get size of retry queue."),
GrpcSummaryMetrics::getRetrySize)
- .put(join("server.tps", "get size of retry queue."),
GrpcSummaryMetrics::getClient2EventMeshTPS)
- .put(join("client.tps", "get tps of eventMesh to mq."),
GrpcSummaryMetrics::getEventMesh2ClientTPS)
- .put(join("mq.provider.tps", "get tps of eventMesh to mq."),
GrpcSummaryMetrics::getEventMesh2MqTPS)
- .put(join("mq.consumer.tps", "get tps of eventMesh to mq."),
GrpcSummaryMetrics::getMq2EventMeshTPS)
- .build();
+ .<String[], Function<GrpcSummaryMetrics, Number>>builder()
+ .put(join("sub.topic.num", "get sub topic num."),
GrpcSummaryMetrics::getSubscribeTopicNum)
+ .put(join("retry.queue.size", "get size of retry queue."),
GrpcSummaryMetrics::getRetrySize)
+ .put(join("server.tps", "get size of retry queue."),
GrpcSummaryMetrics::getClient2EventMeshTPS)
+ .put(join("client.tps", "get tps of eventMesh to mq."),
GrpcSummaryMetrics::getEventMesh2ClientTPS)
+ .put(join("mq.provider.tps", "get tps of eventMesh to mq."),
GrpcSummaryMetrics::getEventMesh2MqTPS)
+ .put(join("mq.consumer.tps", "get tps of eventMesh to mq."),
GrpcSummaryMetrics::getMq2EventMeshTPS)
+ .build();
public static void export(final String meterName, final GrpcSummaryMetrics
summaryMetrics) {
final Meter meter = GlobalMeterProvider.getMeter(meterName);
diff --git
a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusHttpExporter.java
b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusHttpExporter.java
index fc5a65417..a09cb074d 100644
---
a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusHttpExporter.java
+++
b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusHttpExporter.java
@@ -40,81 +40,81 @@ public class PrometheusHttpExporter {
* Map structure : [metric name, description of name] -> the method of get
corresponding metric.
*/
private final Map<String[], Function<HttpSummaryMetrics, Number>>
paramPairs = ImmutableMap
- .<String[], Function<HttpSummaryMetrics, Number>>builder()
- // maxHTTPTPS
- .put(join("eventmesh.http.request.tps.max", "max TPS of HTTP."),
HttpSummaryMetrics::maxHTTPTPS)
- // avgHTTPTPS
- .put(join("eventmesh.http.request.tps.avg", "avg TPS of HTTP."),
HttpSummaryMetrics::avgHTTPTPS)
- // maxHTTPCost
- .put(join("eventmesh.http.request.cost.max", "max cost of HTTP."),
HttpSummaryMetrics::maxHTTPCost)
- // avgHTTPCost
- .put(join("eventmesh.http.request.cost.avg", "avg cost of HTTP."),
HttpSummaryMetrics::avgHTTPCost)
- // avgHTTPBodyDecodeCost
- .put(join("eventmesh.http.body.decode.cost.avg", "avg body decode
cost of HTTP."), HttpSummaryMetrics::avgHTTPBodyDecodeCost)
- // httpDiscard
- .put(join("eventmesh.http.request.discard.num", "http request
discard num."), HttpSummaryMetrics::getHttpDiscard)
- // maxBatchSendMsgTPS
- .put(join("eventmesh.batch.send.message.tps.max", "max of batch
send message tps."), HttpSummaryMetrics::maxSendBatchMsgTPS)
- // avgBatchSendMsgTPS
- .put(join("eventmesh.batch.send.message.tps.avg", "avg of batch
send message tps."), HttpSummaryMetrics::avgSendBatchMsgTPS)
- // sum
- .put(join("eventmesh.batch.send.message.num", "sum of batch send
message number."), HttpSummaryMetrics::getSendBatchMsgNumSum)
- // sumFail
- .put(join("eventmesh.batch.send.message.fail.num", "sum of batch
send message fail message number."),
- HttpSummaryMetrics::getSendBatchMsgFailNumSum)
- // sumFailRate
- .put(join("eventmesh.batch.send.message.fail.rate", "send batch
message fail rate."), HttpSummaryMetrics::getSendBatchMsgFailRate)
- // discard
- .put(join("eventmesh.batch.send.message.discard.num", "sum of send
batch message discard number."),
- HttpSummaryMetrics::getSendBatchMsgDiscardNumSum)
- // maxSendMsgTPS
- .put(join("eventmesh.send.message.tps.max", "max of send message
tps."), HttpSummaryMetrics::maxSendMsgTPS)
- // avgSendMsgTPS
- .put(join("eventmesh.send.message.tps.avg", "avg of send message
tps."), HttpSummaryMetrics::avgSendMsgTPS)
- // sum
- .put(join("eventmesh.send.message.num", "sum of send message
number."), HttpSummaryMetrics::getSendMsgNumSum)
- // sumFail
- .put(join("eventmesh.send.message.fail.num", "sum of send message
fail number."), HttpSummaryMetrics::getSendMsgFailNumSum)
- // sumFailRate
- .put(join("eventmesh.send.message.fail.rate", "send message fail
rate."), HttpSummaryMetrics::getSendMsgFailRate)
- // replyMsg
- .put(join("eventmesh.reply.message.num", "sum of reply message
number."), HttpSummaryMetrics::getReplyMsgNumSum)
- // replyFail
- .put(join("eventmesh.reply.message.fail.num", "sum of reply
message fail number."), HttpSummaryMetrics::getReplyMsgFailNumSum)
- // maxPushMsgTPS
- .put(join("eventmesh.push.message.tps.max", "max of push message
tps."), HttpSummaryMetrics::maxPushMsgTPS)
- // avgPushMsgTPS
- .put(join("eventmesh.push.message.tps.avg", "avg of push message
tps."), HttpSummaryMetrics::avgPushMsgTPS)
- // sum
- .put(join("eventmesh.http.push.message.num", "sum of http push
message number."), HttpSummaryMetrics::getHttpPushMsgNumSum)
- // sumFail
- .put(join("eventmesh.http.push.message.fail.num", "sum of http
push message fail number."), HttpSummaryMetrics::getHttpPushFailNumSum)
- // sumFailRate
- .put(join("eventmesh.http.push.message.fail.rate", "http push
message fail rate."), HttpSummaryMetrics::getHttpPushMsgFailRate)
- // maxClientLatency
- .put(join("eventmesh.http.push.latency.max", "max of http push
latency."), HttpSummaryMetrics::maxHTTPPushLatency)
- // avgClientLatency
- .put(join("eventmesh.http.push.latency.avg", "avg of http push
latency."), HttpSummaryMetrics::avgHTTPPushLatency)
- // batchMsgQ
- .put(join("eventmesh.batch.message.queue.size", "size of batch
message queue."), HttpSummaryMetrics::getBatchMsgQueueSize)
- // sendMsgQ
- .put(join("eventmesh.send.message.queue.size", "size of send
message queue."), HttpSummaryMetrics::getSendMsgQueueSize)
- // pushMsgQ
- .put(join("eventmesh.push.message.queue.size", "size of push
message queue."), HttpSummaryMetrics::getPushMsgQueueSize)
- // httpRetryQ
- .put(join("eventmesh.http.retry.queue.size", "size of http retry
queue."), HttpSummaryMetrics::getHttpRetryQueueSize)
- // batchAvgSend2MQCost
- .put(join("eventmesh.batch.send.message.cost.avg", "avg of batch
send message cost."), HttpSummaryMetrics::avgBatchSendMsgCost)
- // avgSend2MQCost
- .put(join("eventmesh.send.message.cost.avg", "avg of send message
cost."), HttpSummaryMetrics::avgSendMsgCost)
- // avgReply2MQCost
- .put(join("eventmesh.reply.message.cost.avg", "avg of reply
message cost."), HttpSummaryMetrics::avgReplyMsgCost)
- .build();
+ .<String[], Function<HttpSummaryMetrics, Number>>builder()
+ // maxHTTPTPS
+ .put(join("eventmesh.http.request.tps.max", "max TPS of HTTP."),
HttpSummaryMetrics::maxHTTPTPS)
+ // avgHTTPTPS
+ .put(join("eventmesh.http.request.tps.avg", "avg TPS of HTTP."),
HttpSummaryMetrics::avgHTTPTPS)
+ // maxHTTPCost
+ .put(join("eventmesh.http.request.cost.max", "max cost of HTTP."),
HttpSummaryMetrics::maxHTTPCost)
+ // avgHTTPCost
+ .put(join("eventmesh.http.request.cost.avg", "avg cost of HTTP."),
HttpSummaryMetrics::avgHTTPCost)
+ // avgHTTPBodyDecodeCost
+ .put(join("eventmesh.http.body.decode.cost.avg", "avg body decode cost
of HTTP."), HttpSummaryMetrics::avgHTTPBodyDecodeCost)
+ // httpDiscard
+ .put(join("eventmesh.http.request.discard.num", "http request discard
num."), HttpSummaryMetrics::getHttpDiscard)
+ // maxBatchSendMsgTPS
+ .put(join("eventmesh.batch.send.message.tps.max", "max of batch send
message tps."), HttpSummaryMetrics::maxSendBatchMsgTPS)
+ // avgBatchSendMsgTPS
+ .put(join("eventmesh.batch.send.message.tps.avg", "avg of batch send
message tps."), HttpSummaryMetrics::avgSendBatchMsgTPS)
+ // sum
+ .put(join("eventmesh.batch.send.message.num", "sum of batch send
message number."), HttpSummaryMetrics::getSendBatchMsgNumSum)
+ // sumFail
+ .put(join("eventmesh.batch.send.message.fail.num", "sum of batch send
message fail message number."),
+ HttpSummaryMetrics::getSendBatchMsgFailNumSum)
+ // sumFailRate
+ .put(join("eventmesh.batch.send.message.fail.rate", "send batch
message fail rate."), HttpSummaryMetrics::getSendBatchMsgFailRate)
+ // discard
+ .put(join("eventmesh.batch.send.message.discard.num", "sum of send
batch message discard number."),
+ HttpSummaryMetrics::getSendBatchMsgDiscardNumSum)
+ // maxSendMsgTPS
+ .put(join("eventmesh.send.message.tps.max", "max of send message
tps."), HttpSummaryMetrics::maxSendMsgTPS)
+ // avgSendMsgTPS
+ .put(join("eventmesh.send.message.tps.avg", "avg of send message
tps."), HttpSummaryMetrics::avgSendMsgTPS)
+ // sum
+ .put(join("eventmesh.send.message.num", "sum of send message
number."), HttpSummaryMetrics::getSendMsgNumSum)
+ // sumFail
+ .put(join("eventmesh.send.message.fail.num", "sum of send message fail
number."), HttpSummaryMetrics::getSendMsgFailNumSum)
+ // sumFailRate
+ .put(join("eventmesh.send.message.fail.rate", "send message fail
rate."), HttpSummaryMetrics::getSendMsgFailRate)
+ // replyMsg
+ .put(join("eventmesh.reply.message.num", "sum of reply message
number."), HttpSummaryMetrics::getReplyMsgNumSum)
+ // replyFail
+ .put(join("eventmesh.reply.message.fail.num", "sum of reply message
fail number."), HttpSummaryMetrics::getReplyMsgFailNumSum)
+ // maxPushMsgTPS
+ .put(join("eventmesh.push.message.tps.max", "max of push message
tps."), HttpSummaryMetrics::maxPushMsgTPS)
+ // avgPushMsgTPS
+ .put(join("eventmesh.push.message.tps.avg", "avg of push message
tps."), HttpSummaryMetrics::avgPushMsgTPS)
+ // sum
+ .put(join("eventmesh.http.push.message.num", "sum of http push message
number."), HttpSummaryMetrics::getHttpPushMsgNumSum)
+ // sumFail
+ .put(join("eventmesh.http.push.message.fail.num", "sum of http push
message fail number."), HttpSummaryMetrics::getHttpPushFailNumSum)
+ // sumFailRate
+ .put(join("eventmesh.http.push.message.fail.rate", "http push message
fail rate."), HttpSummaryMetrics::getHttpPushMsgFailRate)
+ // maxClientLatency
+ .put(join("eventmesh.http.push.latency.max", "max of http push
latency."), HttpSummaryMetrics::maxHTTPPushLatency)
+ // avgClientLatency
+ .put(join("eventmesh.http.push.latency.avg", "avg of http push
latency."), HttpSummaryMetrics::avgHTTPPushLatency)
+ // batchMsgQ
+ .put(join("eventmesh.batch.message.queue.size", "size of batch message
queue."), HttpSummaryMetrics::getBatchMsgQueueSize)
+ // sendMsgQ
+ .put(join("eventmesh.send.message.queue.size", "size of send message
queue."), HttpSummaryMetrics::getSendMsgQueueSize)
+ // pushMsgQ
+ .put(join("eventmesh.push.message.queue.size", "size of push message
queue."), HttpSummaryMetrics::getPushMsgQueueSize)
+ // httpRetryQ
+ .put(join("eventmesh.http.retry.queue.size", "size of http retry
queue."), HttpSummaryMetrics::getHttpRetryQueueSize)
+ // batchAvgSend2MQCost
+ .put(join("eventmesh.batch.send.message.cost.avg", "avg of batch send
message cost."), HttpSummaryMetrics::avgBatchSendMsgCost)
+ // avgSend2MQCost
+ .put(join("eventmesh.send.message.cost.avg", "avg of send message
cost."), HttpSummaryMetrics::avgSendMsgCost)
+ // avgReply2MQCost
+ .put(join("eventmesh.reply.message.cost.avg", "avg of reply message
cost."), HttpSummaryMetrics::avgReplyMsgCost)
+ .build();
public void export(String name, HttpSummaryMetrics summaryMetrics) {
Meter meter = GlobalMeterProvider.getMeter(name);
paramPairs.forEach((metricInfo, getMetric) -> observeOfValue(meter,
metricInfo[0], metricInfo[1],
- HTTP, summaryMetrics, getMetric, HttpSummaryMetrics.class));
+ HTTP, summaryMetrics, getMetric, HttpSummaryMetrics.class));
}
}
diff --git
a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusTcpExporter.java
b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusTcpExporter.java
index a8eff9de4..ff9eef9a4 100644
---
a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusTcpExporter.java
+++
b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/metrics/PrometheusTcpExporter.java
@@ -41,29 +41,29 @@ public class PrometheusTcpExporter {
* Map structure : [metric name, description of name] -> the method of get
corresponding metric.
*/
private final Map<String[], Function<TcpSummaryMetrics, Number>>
paramPairs = ImmutableMap
- .<String[], Function<TcpSummaryMetrics, Number>>builder()
- // retryQueueSize
- .put(join("retry.queue.size", "get size of retry queue."),
TcpSummaryMetrics::getRetrySize)
- // client2eventMeshTPS
- .put(join("server.tps", "get tps of client to eventMesh."),
TcpSummaryMetrics::getClient2eventMeshTPS)
- // eventMesh2mqTPS
- .put(join("mq.provider.tps", "get tps of eventMesh to mq."),
TcpSummaryMetrics::getEventMesh2mqTPS)
- // mq2eventMeshTPS
- .put(join("mq.consumer.tps", "get tps of mq to eventMesh."),
TcpSummaryMetrics::getMq2eventMeshTPS)
- // eventMesh2clientTPS
- .put(join("client.tps", "get tps of eventMesh to client."),
TcpSummaryMetrics::getEventMesh2clientTPS)
- // allTPS
- .put(join("all.tps", "get all TPS."), TcpSummaryMetrics::getAllTPS)
- // EventMeshTcpConnectionHandler.connections
- .put(join("connection.num",
"EventMeshTcpConnectionHandler.connections."),
TcpSummaryMetrics::getAllConnections)
- // subTopicNum
- .put(join("sub.topic.num", "get sub topic num."),
TcpSummaryMetrics::getSubTopicNum)
- .build();
+ .<String[], Function<TcpSummaryMetrics, Number>>builder()
+ // retryQueueSize
+ .put(join("retry.queue.size", "get size of retry queue."),
TcpSummaryMetrics::getRetrySize)
+ // client2eventMeshTPS
+ .put(join("server.tps", "get tps of client to eventMesh."),
TcpSummaryMetrics::getClient2eventMeshTPS)
+ // eventMesh2mqTPS
+ .put(join("mq.provider.tps", "get tps of eventMesh to mq."),
TcpSummaryMetrics::getEventMesh2mqTPS)
+ // mq2eventMeshTPS
+ .put(join("mq.consumer.tps", "get tps of mq to eventMesh."),
TcpSummaryMetrics::getMq2eventMeshTPS)
+ // eventMesh2clientTPS
+ .put(join("client.tps", "get tps of eventMesh to client."),
TcpSummaryMetrics::getEventMesh2clientTPS)
+ // allTPS
+ .put(join("all.tps", "get all TPS."), TcpSummaryMetrics::getAllTPS)
+ // EventMeshTcpConnectionHandler.connections
+ .put(join("connection.num",
"EventMeshTcpConnectionHandler.connections."),
TcpSummaryMetrics::getAllConnections)
+ // subTopicNum
+ .put(join("sub.topic.num", "get sub topic num."),
TcpSummaryMetrics::getSubTopicNum)
+ .build();
public void export(final String meterName, final TcpSummaryMetrics
summaryMetrics) {
final Meter meter = GlobalMeterProvider.getMeter(meterName);
paramPairs.forEach(
- (metricInfo, getMetric) -> observeOfValue(meter,
METRICS_TCP_PREFIX + metricInfo[0], metricInfo[1],
- TCP, summaryMetrics, getMetric,
TcpSummaryMetrics.class));
+ (metricInfo, getMetric) -> observeOfValue(meter,
METRICS_TCP_PREFIX + metricInfo[0], metricInfo[1],
+ TCP, summaryMetrics, getMetric, TcpSummaryMetrics.class));
}
}
diff --git
a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/utils/PrometheusExporterUtils.java
b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/utils/PrometheusExporterUtils.java
index 7f17606ad..1a772020c 100644
---
a/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/utils/PrometheusExporterUtils.java
+++
b/eventmesh-metrics-plugin/eventmesh-metrics-prometheus/src/main/java/org/apache/eventmesh/metrics/prometheus/utils/PrometheusExporterUtils.java
@@ -70,7 +70,7 @@ public class PrometheusExporterUtils {
* @return
*/
public static String[] join(String metricName, String desc) {
- return new String[] {metricName, desc};
+ return new String[]{metricName, desc};
}
}
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
index d95f9c189..71c3fea4d 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java
@@ -237,12 +237,12 @@ public class SourceWorker implements ConnectorWorker {
CloudEventBuilder cloudEventBuilder = CloudEventBuilder.v1();
cloudEventBuilder.withId(UUID.randomUUID().toString())
- .withSubject(config.getPubSubConfig().getSubject())
- .withSource(URI.create("/"))
- .withDataContentType("application/cloudevents+json")
- .withType(CLOUD_EVENTS_PROTOCOL_NAME)
-
.withData(Objects.requireNonNull(JsonUtils.toJSONString(connectRecord.getData())).getBytes(StandardCharsets.UTF_8))
- .withExtension("ttl", 10000);
+ .withSubject(config.getPubSubConfig().getSubject())
+ .withSource(URI.create("/"))
+ .withDataContentType("application/cloudevents+json")
+ .withType(CLOUD_EVENTS_PROTOCOL_NAME)
+
.withData(Objects.requireNonNull(JsonUtils.toJSONString(connectRecord.getData())).getBytes(StandardCharsets.UTF_8))
+ .withExtension("ttl", 10000);
for (String key : connectRecord.getExtensions().keySet()) {
if
(CloudEventUtil.validateExtensionType(connectRecord.getExtensionObj(key))) {
diff --git
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
index 54be8acc0..5691d43fd 100644
---
a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
+++
b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/util/CloudEventUtil.java
@@ -80,7 +80,7 @@ public class CloudEventUtil {
public static boolean validateExtensionType(Object obj) {
return obj instanceof String || obj instanceof Number || obj
instanceof Boolean
- || obj instanceof URI || obj instanceof OffsetDateTime || obj
instanceof byte[];
+ || obj instanceof URI || obj instanceof OffsetDateTime || obj
instanceof byte[];
}
}
diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle
index 871ce8cbe..eef0bb2f9 100644
--- a/eventmesh-runtime/build.gradle
+++ b/eventmesh-runtime/build.gradle
@@ -36,6 +36,7 @@ dependencies {
implementation "commons-validator:commons-validator"
implementation project(":eventmesh-common")
+ implementation project(":eventmesh-filter")
implementation project(":eventmesh-spi")
implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
implementation
project(":eventmesh-storage-plugin:eventmesh-storage-standalone")
@@ -44,6 +45,7 @@ dependencies {
implementation project(":eventmesh-security-plugin:eventmesh-security-acl")
implementation
project(":eventmesh-security-plugin:eventmesh-security-auth-http-basic")
implementation
project(":eventmesh-security-plugin:eventmesh-security-auth-token")
+ implementation project(":eventmesh-transformer")
implementation project(":eventmesh-meta:eventmesh-meta-api")
implementation project(":eventmesh-meta:eventmesh-meta-nacos")
implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api")
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
index 81927d6f4..be19e3d63 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/AbstractTCPServer.java
@@ -289,7 +289,7 @@ public class AbstractTCPServer extends
AbstractRemotingServer {
}
private void processTcpCommandRequest(final Package pkg, final
ChannelHandlerContext ctx,
- final long startTime, final
Command cmd) {
+ final long startTime, final Command cmd) {
Pair<TcpProcessor, ThreadPoolExecutor> pair =
tcpRequestProcessorTable.get(cmd);
pair.getObject2().submit(() -> {
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index cff5a6e69..dc51f084a 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -82,6 +82,7 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
private final EventMeshHTTPConfiguration eventMeshHttpConfiguration;
private final MetaStorage metaStorage;
+
private final Acl acl;
private final EventBus eventBus = new EventBus();
@@ -89,6 +90,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
private ProducerManager producerManager;
private SubscriptionManager subscriptionManager;
+ private FilterEngine filterEngine;
+
private HttpRetryer httpRetryer;
private transient RateLimiter msgRateLimiter;
@@ -132,6 +135,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer
{
producerManager = new ProducerManager(this);
producerManager.init();
+ filterEngine = new FilterEngine(metaStorage, producerManager,
consumerManager);
+
super.setHandlerService(new HandlerService());
super.getHandlerService().setMetrics(this.getMetrics());
@@ -155,6 +160,10 @@ public class EventMeshHTTPServer extends
AbstractHTTPServer {
consumerManager.start();
producerManager.start();
httpRetryer.start();
+ // filterEngine depend on metaStorage
+ if (metaStorage.getStarted().get()) {
+ filterEngine.start();
+ }
if (eventMeshHttpConfiguration.isEventMeshServerMetaStorageEnable()) {
this.register();
@@ -169,6 +178,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer
{
this.getMetrics().shutdown();
+ filterEngine.shutdown();
+
consumerManager.shutdown();
httpClientPool.shutdown();
@@ -339,6 +350,10 @@ public class EventMeshHTTPServer extends
AbstractHTTPServer {
return batchRateLimiter;
}
+ public FilterEngine getFilterEngine() {
+ return filterEngine;
+ }
+
public MetaStorage getMetaStorage() {
return metaStorage;
}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java
new file mode 100644
index 000000000..2053e5d13
--- /dev/null
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/FilterEngine.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.boot;
+
+import org.apache.eventmesh.api.meta.MetaServiceListener;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.common.utils.LogUtils;
+import org.apache.eventmesh.filter.pattern.Pattern;
+import org.apache.eventmesh.filter.patternbuild.PatternBuilder;
+import
org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager;
+import
org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
+import org.apache.eventmesh.runtime.meta.MetaStorage;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class FilterEngine {
+
+ /**
+ * key:group-topic
+ **/
+ private final Map<String, Pattern> filterPatternMap = new HashMap<>();
+
+ private final String filterPrefix = "filter-";
+
+ private final MetaStorage metaStorage;
+
+ private MetaServiceListener metaServiceListener;
+
+ private final ProducerManager producerManager;
+
+ private final ConsumerManager consumerManager;
+
+ private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
+
+ public FilterEngine(MetaStorage metaStorage, ProducerManager
producerManager, ConsumerManager consumerManager) {
+ this.metaStorage = metaStorage;
+ this.producerManager = producerManager;
+ this.consumerManager = consumerManager;
+ }
+
+ public void start() {
+ Map<String, String> filterMetaData =
metaStorage.getMetaData(filterPrefix, true);
+ for (Entry<String, String> filterDataEntry :
filterMetaData.entrySet()) {
+ // filter-group
+ String key = filterDataEntry.getKey();
+ // topic-filterRule list
+ String value = filterDataEntry.getValue();
+ updateFilterPatternMap(key, value);
+ }
+ metaServiceListener = this::updateFilterPatternMap;
+
+ // addListeners for producerManager & consumerManager
+ scheduledExecutorService.scheduleAtFixedRate(() -> {
+ ConcurrentHashMap<String, EventMeshProducer> producerMap =
producerManager.getProducerTable();
+ for (String producerGroup : producerMap.keySet()) {
+ for (String filterKey : filterPatternMap.keySet()) {
+ if (!StringUtils.contains(filterKey, producerGroup)) {
+ addFilterListener(producerGroup);
+ LogUtils.info(log, "addFilterListener for producer
group: " + producerGroup);
+ }
+ }
+ }
+ ConcurrentHashMap<String, ConsumerGroupManager> consumerMap =
consumerManager.getClientTable();
+ for (String consumerGroup : consumerMap.keySet()) {
+ for (String filterKey : filterPatternMap.keySet()) {
+ if (!StringUtils.contains(filterKey, consumerGroup)) {
+ addFilterListener(consumerGroup);
+ LogUtils.info(log, "addFilterListener for consumer
group: " + consumerGroup);
+ }
+ }
+ }
+ }, 10_000, 5_000, TimeUnit.MILLISECONDS);
+ }
+
+ private void updateFilterPatternMap(String key, String value) {
+ String group = StringUtils.substringAfter(key, filterPrefix);
+
+ JsonNode filterJsonNodeArray = JsonUtils.getJsonNode(value);
+ if (filterJsonNodeArray != null) {
+ for (JsonNode filterJsonNode : filterJsonNodeArray) {
+ String topic = filterJsonNode.get("topic").asText();
+ String filterCondition =
filterJsonNode.get("condition").toString();
+ Pattern filterPattern = PatternBuilder.build(filterCondition);
+ filterPatternMap.put(group + "-" + topic, filterPattern);
+ }
+ }
+ addFilterListener(group);
+ }
+
+ public void addFilterListener(String group) {
+ String filterKey = filterPrefix + group;
+ try {
+ metaStorage.getMetaDataWithListener(metaServiceListener,
filterKey);
+ } catch (Exception e) {
+ throw new RuntimeException("addFilterListener exception", e);
+ }
+ }
+
+ public void shutdown() {
+ scheduledExecutorService.shutdown();
+ }
+
+ public Pattern getFilterPattern(String key) {
+ return filterPatternMap.get(key);
+ }
+}
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
index cb96e5433..52f740151 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
@@ -29,8 +29,10 @@ import
org.apache.eventmesh.common.protocol.http.common.EventMeshRetCode;
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
import org.apache.eventmesh.common.protocol.http.common.RequestURI;
import org.apache.eventmesh.common.utils.IPUtils;
+import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.common.utils.RandomStringUtils;
+import org.apache.eventmesh.filter.pattern.Pattern;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.acl.Acl;
@@ -158,6 +160,8 @@ public class SendAsyncEventProcessor implements
AsyncHttpProcessor {
event.getExtension(ProtocolKey.ClientInstanceKey.PRODUCERGROUP.getKey())).toString();
final String topic = event.getSubject();
+ Pattern filterPattern =
eventMeshHTTPServer.getFilterEngine().getFilterPattern(producerGroup + "-" +
topic);
+
// validate body
if (StringUtils.isAnyBlank(bizNo, uniqueId, producerGroup, topic)
|| event.getData() == null) {
@@ -237,41 +241,53 @@ public class SendAsyncEventProcessor implements
AsyncHttpProcessor {
eventMeshHTTPServer.getMetrics().getSummaryMetrics().recordSendMsg();
final long startTime = System.currentTimeMillis();
-
+ boolean isFiltered = true;
try {
event = CloudEventBuilder.from(sendMessageContext.getEvent())
.withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.build();
handlerSpecific.getTraceOperation().createClientTraceOperation(EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(),
event),
EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_CLIENT_SPAN,
false);
+ if (filterPattern != null) {
+ isFiltered =
filterPattern.filter(JsonUtils.toJSONString(event));
+ }
- eventMeshProducer.send(sendMessageContext, new SendCallback() {
-
- @Override
- public void onSuccess(final SendResult sendResult) {
- responseBodyMap.put(EventMeshConstants.RET_CODE,
EventMeshRetCode.SUCCESS.getRetCode());
- responseBodyMap.put(EventMeshConstants.RET_MSG,
EventMeshRetCode.SUCCESS.getErrMsg() + sendResult);
-
- LogUtils.info(log,
"message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
- System.currentTimeMillis() - startTime, topic, bizNo,
uniqueId);
-
handlerSpecific.getTraceOperation().endLatestTrace(sendMessageContext.getEvent());
- handlerSpecific.sendResponse(responseHeaderMap,
responseBodyMap);
- }
+ if (isFiltered) {
+ eventMeshProducer.send(sendMessageContext, new SendCallback() {
+
+ @Override
+ public void onSuccess(final SendResult sendResult) {
+ responseBodyMap.put(EventMeshConstants.RET_CODE,
EventMeshRetCode.SUCCESS.getRetCode());
+ responseBodyMap.put(EventMeshConstants.RET_MSG,
EventMeshRetCode.SUCCESS.getErrMsg() + sendResult);
+
+ LogUtils.info(log,
"message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
+ System.currentTimeMillis() - startTime, topic,
bizNo, uniqueId);
+
handlerSpecific.getTraceOperation().endLatestTrace(sendMessageContext.getEvent());
+ handlerSpecific.sendResponse(responseHeaderMap,
responseBodyMap);
+ }
+
+ @Override
+ public void onException(final OnExceptionContext context) {
+ responseBodyMap.put(EventMeshConstants.RET_CODE,
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode());
+ responseBodyMap.put(EventMeshConstants.RET_MSG,
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
+ + EventMeshUtil.stackTrace(context.getException(),
2));
+
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10,
TimeUnit.SECONDS);
+
handlerSpecific.getTraceOperation().exceptionLatestTrace(context.getException(),
+
EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(),
sendMessageContext.getEvent()));
+
+ handlerSpecific.sendResponse(responseHeaderMap,
responseBodyMap);
+ LogUtils.error(log,
"message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
+ System.currentTimeMillis() - startTime, topic,
bizNo, uniqueId, context.getException());
+ }
+ });
+ } else {
+ LogUtils.error(log,
"message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}|apply
filter failed",
+ System.currentTimeMillis() - startTime, topic, bizNo,
uniqueId);
+
handlerSpecific.getTraceOperation().endLatestTrace(sendMessageContext.getEvent());
+
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_FILTER_MSG_ERR,
responseHeaderMap, responseBodyMap,
+
EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
+ }
- @Override
- public void onException(final OnExceptionContext context) {
- responseBodyMap.put(EventMeshConstants.RET_CODE,
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode());
- responseBodyMap.put(EventMeshConstants.RET_MSG,
EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
- + EventMeshUtil.stackTrace(context.getException(), 2));
-
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10,
TimeUnit.SECONDS);
-
handlerSpecific.getTraceOperation().exceptionLatestTrace(context.getException(),
-
EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(),
sendMessageContext.getEvent()));
-
- handlerSpecific.sendResponse(responseHeaderMap,
responseBodyMap);
- LogUtils.error(log,
"message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
- System.currentTimeMillis() - startTime, topic, bizNo,
uniqueId, context.getException());
- }
- });
} catch (Exception ex) {
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10,
TimeUnit.SECONDS);
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR,
responseHeaderMap, responseBodyMap, null);
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index 9cf01cd48..bee14c992 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -31,6 +31,7 @@ import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.common.utils.RandomStringUtils;
+import org.apache.eventmesh.filter.pattern.Pattern;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
@@ -126,6 +127,17 @@ public class AsyncHTTPPushRequest extends
AbstractHTTPPushRequest {
.withExtension(EventMeshConstants.RSP_URL, currPushUrl)
.withExtension(EventMeshConstants.RSP_GROUP,
handleMsgContext.getConsumerGroup())
.build();
+
+ Pattern filterPattern = eventMeshHTTPServer.getFilterEngine()
+ .getFilterPattern(handleMsgContext.getConsumerGroup() + "-" +
handleMsgContext.getTopic());
+ if (filterPattern != null) {
+ if (!filterPattern.filter(JsonUtils.toJSONString(event))) {
+ LOGGER.error("apply filter failed, group:{}, topic:{},
bizSeqNo={}, uniqueId={}",
+ this.handleMsgContext.getConsumerGroup(),
+ this.handleMsgContext.getTopic(),
this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId());
+ return;
+ }
+ }
handleMsgContext.setEvent(event);
super.setEvent(event);
diff --git
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/meta/MetaStorage.java
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/meta/MetaStorage.java
index c51473253..41da6994f 100644
---
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/meta/MetaStorage.java
+++
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/meta/MetaStorage.java
@@ -19,6 +19,7 @@ package org.apache.eventmesh.runtime.meta;
import org.apache.eventmesh.api.exception.MetaException;
import org.apache.eventmesh.api.meta.MetaService;
+import org.apache.eventmesh.api.meta.MetaServiceListener;
import org.apache.eventmesh.api.meta.bo.EventMeshAppSubTopicInfo;
import org.apache.eventmesh.api.meta.bo.EventMeshServicePubTopicInfo;
import org.apache.eventmesh.api.meta.dto.EventMeshDataInfo;
@@ -128,4 +129,20 @@ public class MetaStorage {
public EventMeshAppSubTopicInfo findEventMeshAppSubTopicInfo(String group)
throws Exception {
return metaService.findEventMeshAppSubTopicInfoByGroup(group);
}
+
+ public Map<String, String> getMetaData(String key, boolean fuzzyEnabled) {
+ return metaService.getMetaData(key, fuzzyEnabled);
+ }
+
+ public void getMetaDataWithListener(MetaServiceListener
metaServiceListener, String key) throws Exception {
+ metaService.getMetaDataWithListener(metaServiceListener, key);
+ }
+
+ public AtomicBoolean getInited() {
+ return inited;
+ }
+
+ public AtomicBoolean getStarted() {
+ return started;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]