This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new a717d3503 [WIP][ISSUE #4043]initial implementation of filter and
transform (#4365)
a717d3503 is described below
commit a717d3503af42de7c8757cbc4627caa71a9ee23a
Author: FanYang <[email protected]>
AuthorDate: Wed Oct 25 16:17:07 2023 +0800
[WIP][ISSUE #4043]initial implementation of filter and transform (#4365)
* initial implementation -informal
* initial implementation -informal
* fix code style problem
* remove unnecessary code snippets.
* add license header
* add some dep
* fix JsonPathUtils checkstyle problem
* fix test class checkstyle problem
* fix CI problem
---
eventmesh-common/build.gradle | 5 +
.../eventmesh/common/filter/PatternEntry.java | 68 ++++++++
.../filter/condition/AnythingButCondition.java | 71 ++++++++
.../common/filter/condition/Condition.java | 29 ++++
.../common/filter/condition/ConditionsBuilder.java | 65 +++++++
.../common/filter/condition/ExistsCondition.java | 35 ++++
.../common/filter/condition/NumericCondition.java | 80 +++++++++
.../common/filter/condition/PrefixxCondition.java | 34 ++++
.../filter/condition/SpecifiedCondition.java | 34 ++++
.../common/filter/condition/SuffixCondition.java | 34 ++++
.../eventmesh/common/filter/pattern/Pattern.java | 87 ++++++++++
.../common/filter/patternbuild/PatternBuilder.java | 189 +++++++++++++++++++++
.../common/transform/ConstantTransformer.java | 34 ++++
.../eventmesh/common/transform/JsonPathParser.java | 88 ++++++++++
.../common/transform/OriginalTransformer.java | 26 +++
.../eventmesh/common/transform/Template.java | 47 +++++
.../common/transform/TemplateTransformer.java | 44 +++++
.../eventmesh/common/transform/Transformer.java | 32 ++++
.../common/transform/TransformerBuilder.java | 73 ++++++++
.../common/transform/TransformerType.java | 24 +++
.../eventmesh/common/transform/Variable.java | 30 ++++
.../eventmesh/common/utils/JsonPathUtils.java | 144 ++++++++++++++++
.../eventmesh/common/filter/PatternTest.java | 147 ++++++++++++++++
.../eventmesh/common/transform/TransformTest.java | 139 +++++++++++++++
tools/dependency-check/known-dependencies.txt | 5 +
25 files changed, 1564 insertions(+)
diff --git a/eventmesh-common/build.gradle b/eventmesh-common/build.gradle
index 8fd6dbf1e..f2c627acd 100644
--- a/eventmesh-common/build.gradle
+++ b/eventmesh-common/build.gradle
@@ -25,6 +25,11 @@ dependencies {
api "org.apache.commons:commons-text"
api "org.apache.commons:commons-lang3"
+ implementation group: 'com.jayway.jsonpath', name: 'json-path', version:
'2.7.0'
+ implementation 'commons-net:commons-net:3.9.0'
+ api "io.cloudevents:cloudevents-core"
+ api "io.cloudevents:cloudevents-json-jackson"
+
implementation "org.apache.logging.log4j:log4j-api"
implementation "org.apache.logging.log4j:log4j-core"
implementation "org.apache.logging.log4j:log4j-slf4j-impl"
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/PatternEntry.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/PatternEntry.java
new file mode 100644
index 000000000..b6f747e67
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/PatternEntry.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter;
+
+import org.apache.eventmesh.common.filter.condition.Condition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class PatternEntry {
+
+ private final String patternPath;
+
+ private final List<Condition> conditionList = new ArrayList<>();
+
+ public PatternEntry(final String patternPath) {
+ this.patternPath = patternPath;
+ }
+
+ public void addRuleCondition(Condition patternCondition) {
+ this.conditionList.add(patternCondition);
+ }
+
+ public String getPatternName() {
+ return "123";
+ }
+
+ public String getPatternPath() {
+ return patternPath;
+ }
+
+ public boolean match(JsonNode jsonElement) {
+ for (final Condition patternCondition : conditionList) {
+ if (patternCondition.match(jsonElement)) {
+ return true;
+ }
+ }
+
+ return false;
+
+ }
+
+ /**
+ * Returns the condition list for test only
+ *
+ * @return the condition list
+ */
+ List<Condition> getConditionList() {
+ return conditionList;
+ }
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/AnythingButCondition.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/AnythingButCondition.java
new file mode 100644
index 000000000..f64dd1fc4
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/AnythingButCondition.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.condition;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+class AnythingButCondition implements Condition {
+
+ private List<Condition> conditionList = new ArrayList<>();
+
+ AnythingButCondition(JsonNode condition) {
+
+ if (condition.isValueNode()) {
+ this.conditionList.add(new SpecifiedCondition(condition));
+ }
+ // []
+ if (condition.isArray()) {
+ for (JsonNode element : condition) {
+ this.conditionList.add(new SpecifiedCondition(element));
+ }
+ }
+
+ // prefix,suffix
+ if (condition.isObject()) {
+ Iterator<Entry<String, JsonNode>> iterator = condition.fields();
+ while (iterator.hasNext()) {
+ Entry<String, JsonNode> entry = iterator.next();
+ String key = entry.getKey();
+ JsonNode value = entry.getValue();
+ Condition condition1 = new
ConditionsBuilder().withKey(key).withParams(value).build();
+ this.conditionList.add(condition1);
+ }
+ }
+
+ }
+
+ @Override
+ public boolean match(JsonNode inputEvent) {
+ if (inputEvent == null) {
+ return false;
+ }
+
+ for (Condition condition : conditionList) {
+ if (condition.match(inputEvent)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/Condition.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/Condition.java
new file mode 100644
index 000000000..4ae08616b
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/Condition.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.condition;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ *
+ */
+public interface Condition {
+
+ boolean match(JsonNode inputEvent);
+
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/ConditionsBuilder.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/ConditionsBuilder.java
new file mode 100644
index 000000000..bceb09d57
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/ConditionsBuilder.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.condition;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class ConditionsBuilder {
+
+ private String key;
+ private JsonNode jsonNode;
+
+ public ConditionsBuilder withKey(String key) {
+ this.key = key;
+ return this;
+ }
+
+ public ConditionsBuilder withParams(JsonNode jsonNode) {
+ this.jsonNode = jsonNode;
+ return this;
+ }
+
+ public Condition build() {
+ Condition condition = null;
+ switch (this.key) {
+ case "prefix":
+ condition = new PrefixxCondition(this.jsonNode);
+ break;
+ case "suffix":
+ condition = new SuffixCondition(this.jsonNode);
+ break;
+ case "anything-but":
+ condition = new AnythingButCondition(this.jsonNode);
+ break;
+ case "numeric":
+ condition = new NumericCondition(this.jsonNode);
+ break;
+ case "exists":
+ condition = new ExistsCondition(this.jsonNode);
+ break;
+ case "specified":
+ condition = new SpecifiedCondition(this.jsonNode);
+ break;
+ default:
+ throw new RuntimeException("INVALID CONDITION");
+ // Add cases for other keys and conditions
+ }
+
+ return condition;
+ }
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/ExistsCondition.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/ExistsCondition.java
new file mode 100644
index 000000000..95d765d71
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/ExistsCondition.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.condition;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+class ExistsCondition implements Condition {
+
+ private JsonNode exists;
+
+ ExistsCondition(JsonNode exists) {
+ this.exists = exists;
+ }
+
+ @Override
+ public boolean match(JsonNode inputEvent) {
+
+ return this.exists.asBoolean() == (inputEvent != null);
+ }
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/NumericCondition.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/NumericCondition.java
new file mode 100644
index 000000000..b5c28a4d1
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/NumericCondition.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.condition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+class NumericCondition implements Condition {
+
+ List<String> operators = new ArrayList<>();
+ List<Double> nums = new ArrayList<>();
+
+ NumericCondition(JsonNode numeric) {
+ if (numeric.isArray()) {
+ if (numeric.size() % 2 != 0) {
+ throw new RuntimeException("NUMERIC NO RIGHT FORMAT");
+ }
+ for (int i = 0; i < numeric.size(); i += 2) {
+ JsonNode opt = numeric.get(i);
+ JsonNode number = numeric.get(i + 1);
+ operators.add(opt.asText());
+ nums.add(number.asDouble());
+ }
+ } else {
+ throw new RuntimeException("NUMERIC MUST BE ARRAY");
+ }
+ }
+
+ private boolean compareNums(Double rule, Double target, String opt) {
+ int res = Double.compare(target, rule);
+ switch (opt) {
+ case "=":
+ return res == 0;
+ case "!=":
+ return res != 0;
+ case ">":
+ return res > 0;
+ case ">=":
+ return res >= 0;
+ case "<":
+ return res < 0;
+ case "<=":
+ return res <= 0;
+ default: // Never be here
+ return false;
+ }
+ }
+
+ @Override
+ public boolean match(JsonNode inputEvent) {
+ if (inputEvent.isNumber()) {
+ for (int i = 0; i < operators.size(); ++i) {
+ if (!compareNums(nums.get(i), inputEvent.asDouble(),
operators.get(i))) {
+ return false;
+ }
+
+ }
+
+ }
+
+ return true;
+ }
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/PrefixxCondition.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/PrefixxCondition.java
new file mode 100644
index 000000000..22c1afa61
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/PrefixxCondition.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.condition;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+class PrefixxCondition implements Condition {
+
+ private final String prefix;
+
+ public PrefixxCondition(JsonNode suffix) {
+ this.prefix = suffix.asText();
+ }
+
+ @Override
+ public boolean match(JsonNode inputEvent) {
+ return inputEvent.asText().startsWith(prefix);
+ }
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/SpecifiedCondition.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/SpecifiedCondition.java
new file mode 100644
index 000000000..b6fa884a1
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/SpecifiedCondition.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.condition;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+class SpecifiedCondition implements Condition {
+
+ private final JsonNode specified;
+
+ SpecifiedCondition(JsonNode jsonNode) {
+ specified = jsonNode;
+ }
+
+ @Override
+ public boolean match(JsonNode inputEvent) {
+ return inputEvent.asText().equals(specified.asText());
+ }
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/SuffixCondition.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/SuffixCondition.java
new file mode 100644
index 000000000..b7187d6c1
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/SuffixCondition.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.condition;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+class SuffixCondition implements Condition {
+
+ private final String suffix;
+
+ public SuffixCondition(JsonNode suffix) {
+ this.suffix = suffix.asText();
+ }
+
+ @Override
+ public boolean match(JsonNode inputEvent) {
+ return inputEvent.asText().endsWith(this.suffix);
+ }
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/pattern/Pattern.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/pattern/Pattern.java
new file mode 100644
index 000000000..6187a668b
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/pattern/Pattern.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.pattern;
+
+import org.apache.eventmesh.common.filter.PatternEntry;
+import org.apache.eventmesh.common.utils.JsonPathUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.jayway.jsonpath.PathNotFoundException;
+
+public class Pattern {
+
+ private List<PatternEntry> requiredFieldList = new ArrayList<>();
+ private List<PatternEntry> dataList = new ArrayList<>();
+
+ private String content;
+
+ public void addRequiredFieldList(PatternEntry patternEntry) {
+ this.requiredFieldList.add(patternEntry);
+ }
+
+ public void addDataList(PatternEntry patternEntry) {
+ this.dataList.add(patternEntry);
+ }
+
+ public boolean filter(String content) {
+ this.content = content;
+ // this.jsonNode = JacksonUtils.STRING_TO_JSONNODE(content);
+
+ return matchRequiredFieldList(requiredFieldList) &&
matchRequiredFieldList(dataList);
+ }
+
+ private boolean matchRequiredFieldList(List<PatternEntry> dataList) {
+
+ for (final PatternEntry patternEntry : dataList) {
+ JsonNode jsonElement = null;
+ try {
+ // content:filter
+ String matchRes =
JsonPathUtils.matchJsonPathValue(this.content, patternEntry.getPatternPath());
+
+ if (StringUtils.isNoneBlank(matchRes)) {
+ jsonElement = JsonPathUtils.parseStrict(matchRes);
+ }
+
+ if (jsonElement != null && jsonElement.isArray()) {
+ for (JsonNode element : jsonElement) {
+ if (patternEntry.match(element)) {
+ return true;
+ }
+ }
+ } else {
+ if (!patternEntry.match(jsonElement)) {
+ return false;
+ }
+ }
+
+ } catch (PathNotFoundException | JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+ return true;
+
+ }
+
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/patternbuild/PatternBuilder.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/patternbuild/PatternBuilder.java
new file mode 100644
index 000000000..4abf2af06
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/patternbuild/PatternBuilder.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.patternbuild;
+
+import org.apache.eventmesh.common.exception.JsonException;
+import org.apache.eventmesh.common.filter.PatternEntry;
+import org.apache.eventmesh.common.filter.condition.Condition;
+import org.apache.eventmesh.common.filter.condition.ConditionsBuilder;
+import org.apache.eventmesh.common.filter.pattern.Pattern;
+
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+
+import io.cloudevents.SpecVersion;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class PatternBuilder {
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ public static Pattern build(String jsonStr) {
+
+ Pattern pattern = new Pattern();
+ JsonNode jsonNode = null;
+ try {
+ jsonNode = mapper.readTree(jsonStr);
+ } catch (Exception e) {
+ throw new JsonException("INVALID_JSON_STRING", e);
+ }
+
+ if (jsonNode.isEmpty() || !jsonNode.isObject()) {
+ return null;
+ }
+
+ // iter all json data
+ Iterator<Entry<String, JsonNode>> iterator = jsonNode.fields();
+ while (iterator.hasNext()) {
+ Map.Entry<String, JsonNode> entry = iterator.next();
+ String key = entry.getKey();
+ JsonNode value = entry.getValue();
+
+ if (key.equals("data")) {
+
+ parseDataField(value, pattern);
+ continue;
+ }
+
+ if (!value.isArray()) {
+ throw new JsonException("INVALID_JSON_STRING");
+ }
+
+ if (!SpecVersion.V1.getAllAttributes().contains(key)) {
+ throw new JsonException("INVALID_JSON_KEY");
+ }
+
+ // iter all requiredField
+ parseRequiredField("$." + key, value, pattern);
+ if (value.isEmpty()) {
+ // Empty array
+ throw new JsonException("INVALID_PATTERN_VALUE ");
+ }
+ PatternEntry patternEntry = new PatternEntry("$." + key);
+ for (final JsonNode objNode : value) {
+ // {
+ // "suffix":".jpg"
+ // }
+ Condition condition = parseCondition(objNode);
+ patternEntry.addRuleCondition(condition);
+ }
+
+ pattern.addRequiredFieldList(patternEntry);
+ }
+
+ return pattern;
+
+ }
+
+ private static void parseDataField(JsonNode jsonNode, Pattern pattern) {
+ if (!jsonNode.isObject()) {
+ throw new JsonException("INVALID_JSON_KEY");
+ }
+ Queue<Node> queueNode = new ArrayDeque<>();
+ Node node = new Node("$.data", "data", jsonNode);
+ queueNode.add(node);
+ while (!queueNode.isEmpty()) {
+ Node ele = queueNode.poll();
+ String elepath = ele.getPath();
+ Iterator<Map.Entry<String, JsonNode>> iterator =
ele.getValue().fields();
+ while (iterator.hasNext()) {
+ Map.Entry<String, JsonNode> entry = iterator.next();
+ // state
+ String key = entry.getKey();
+ // [{"anything-but":"initializing"}] [{"anything-but":123}]}
+ JsonNode value = entry.getValue();
+ PatternEntry patternEntry = new PatternEntry(elepath + "." +
key);
+ if (!value.isObject()) {
+ if (value.isArray()) {
+ for (JsonNode node11 : value) {
+ // {"anything-but":"initializing"}
+
patternEntry.addRuleCondition(parseCondition(node11));
+ }
+ }
+ pattern.addDataList(patternEntry);
+ } else {
+ queueNode.add(new Node(elepath + "." + key, key, value));
+ }
+ }
+ }
+
+ }
+
+ private static Condition parseCondition(JsonNode jsonNode) {
+ if (jsonNode.isValueNode()) {
+ return new
ConditionsBuilder().withKey("specified").withParams(jsonNode).build();
+ }
+
+ Iterator<Map.Entry<String, JsonNode>> iterator = jsonNode.fields();
+ while (iterator.hasNext()) {
+ Map.Entry<String, JsonNode> entry = iterator.next();
+ // "anything-but"
+ String key = entry.getKey();
+ // "initializing"
+ JsonNode value = entry.getValue();
+ return new
ConditionsBuilder().withKey(key).withParams(value).build();
+ }
+ return null;
+ }
+
+ private static void parseRequiredField(String path, JsonNode jsonNode,
Pattern pattern) {
+ if (jsonNode.isEmpty()) {
+ // Empty array
+ throw new JsonException("INVALID_PATTERN_VALUE ");
+ }
+ PatternEntry patternEntry = new PatternEntry(path);
+ for (final JsonNode objNode : jsonNode) {
+ Condition condition = parseCondition(objNode);
+ patternEntry.addRuleCondition(condition);
+ }
+
+ pattern.addRequiredFieldList(patternEntry);
+
+ }
+
+ static class Node {
+
+ private String path;
+ private String key;
+ private JsonNode value;
+
+ Node(String path, String key, JsonNode value) {
+ this.path = path;
+ this.key = key;
+ this.value = value;
+ }
+
+ String getPath() {
+ return this.path;
+ }
+
+ String getKey() {
+ return this.key;
+ }
+
+ JsonNode getValue() {
+ return this.value;
+ }
+ }
+
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/ConstantTransformer.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/ConstantTransformer.java
new file mode 100644
index 000000000..e3df38e1e
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/ConstantTransformer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+
+class ConstantTransformer implements Transformer {
+
+ private final String jsonpath;
+
+ ConstantTransformer(String jsonpath) {
+ this.jsonpath = jsonpath;
+ }
+
+ @Override
+ public String transform(String json) throws EventMeshException {
+ return this.jsonpath;
+ }
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/JsonPathParser.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/JsonPathParser.java
new file mode 100644
index 000000000..d35a22205
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/JsonPathParser.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.common.utils.JsonPathUtils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class JsonPathParser {
+
+ protected List<Variable> variablesList = new ArrayList<>();
+
+ public List<Variable> getVariablesList() {
+ return variablesList;
+ }
+
+ /**
+ * parser input jsonpath string into variable list
+ *
+ * @param jsonPathString
+ */
+ public JsonPathParser(String jsonPathString) {
+ JsonNode jsonObject = JsonPathUtils.parseStrict(jsonPathString);
+ Iterator<Map.Entry<String, JsonNode>> fields = jsonObject.fields();
+
+ while (fields.hasNext()) {
+ Map.Entry<String, JsonNode> entry = fields.next();
+ String name = entry.getKey();
+ JsonNode valueNode = entry.getValue();
+ if (valueNode.isValueNode()) {
+ variablesList.add(new Variable(name, valueNode.asText()));
+ } else {
+ throw new EventMeshException("invalid config:" +
jsonPathString);
+ }
+
+ }
+
+ }
+
+ /**
+ * use jsonpath to match json and return result
+ *
+ * @param json
+ * @return
+ */
+
+ public List<Variable> match(String json) throws JsonProcessingException {
+
+ if (json == null || json.isEmpty()) {
+ return new ArrayList<>();
+ }
+
+ List<Variable> variableList = new ArrayList<>(variablesList.size());
+ for (Variable element : variablesList) {
+ if (JsonPathUtils.isValidAndDefinite(element.getJsonPath())) {
+ String res = JsonPathUtils.matchJsonPathValueWithString(json,
element.getJsonPath());
+ Variable variable = new Variable(element.getName(), res);
+ variableList.add(variable);
+ } else {
+ variableList.add(element);
+ }
+
+ }
+ return variableList;
+ }
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/OriginalTransformer.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/OriginalTransformer.java
new file mode 100644
index 000000000..708a2aeb2
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/OriginalTransformer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+class OriginalTransformer implements Transformer {
+
+ @Override
+ public String transform(String json) {
+ return json;
+ }
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Template.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Template.java
new file mode 100644
index 000000000..348ed1a93
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Template.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+
+import org.apache.commons.text.StringSubstitutor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class Template {
+
+ private String template;
+
+ public String substitute(List<Variable> variables) throws
EventMeshException {
+
+ Map<String, String> valuesMap = variables.stream()
+ .filter(variable -> variable.getJsonPath() != null)
+ .collect(Collectors.toMap(Variable::getName,
Variable::getJsonPath));
+ StringSubstitutor sub = new StringSubstitutor(valuesMap);
+
+ return sub.replace(template);
+
+ }
+}
\ No newline at end of file
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TemplateTransformer.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TemplateTransformer.java
new file mode 100644
index 000000000..ea42d05de
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TemplateTransformer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+import java.util.List;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+class TemplateTransformer implements Transformer {
+
+ private final JsonPathParser jsonPathParser;
+
+ private final Template template;
+
+ TemplateTransformer(JsonPathParser jsonPathParser, Template template) {
+ this.template = template;
+ this.jsonPathParser = jsonPathParser;
+ }
+
+ @Override
+ public String transform(String json) throws JsonProcessingException {
+ // 1: get variable match results
+ List<Variable> variableList = jsonPathParser.match(json);
+ // 2: use results replace template
+ String res = template.substitute(variableList);
+ return res;
+ }
+
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Transformer.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Transformer.java
new file mode 100644
index 000000000..fecfe4b18
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Transformer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+/**
+ * EventMesh transformer interface, specified transformer implementation
includes:
+ * 1. Constant
+ * 2. Original
+ * 3. Template
+ */
+public interface Transformer {
+
+ String transform(String json) throws JsonProcessingException;
+
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TransformerBuilder.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TransformerBuilder.java
new file mode 100644
index 000000000..0eafdebb7
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TransformerBuilder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+
+public class TransformerBuilder {
+
+ public static final class Builder {
+
+ private final TransformerType transformerType;
+ private String template;
+ private String content;
+
+ public Builder(TransformerType transformerType) {
+ this.transformerType = transformerType;
+ }
+
+ public Builder setContent(String content) {
+ this.content = content;
+ return this;
+ }
+
+ public Builder setTemplate(String template) {
+ this.template = template;
+ return this;
+ }
+
+ public Transformer build() {
+ switch (this.transformerType) {
+ case CONSTANT:
+ return buildConstantTransformer(this.content);
+ case ORIGINAL:
+ return buildOriginalTransformer();
+ case TEMPLATE:
+ return buildTemplateTransFormer(this.content,
this.template);
+ default:
+ throw new EventMeshException("invalid config");
+ }
+ }
+
+ }
+
+ public static Transformer buildTemplateTransFormer(String jsonContent,
String template) {
+ JsonPathParser jsonPathParser = new JsonPathParser(jsonContent);
+ Template templateEntry = new Template(template);
+ return new TemplateTransformer(jsonPathParser, templateEntry);
+ }
+
+ public static Transformer buildConstantTransformer(String constant) {
+ return new ConstantTransformer(constant);
+ }
+
+ public static Transformer buildOriginalTransformer() {
+ return new OriginalTransformer();
+ }
+
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TransformerType.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TransformerType.java
new file mode 100644
index 000000000..13a5c05d4
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TransformerType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+public enum TransformerType {
+ ORIGINAL,
+ CONSTANT,
+ TEMPLATE
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Variable.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Variable.java
new file mode 100644
index 000000000..b7f139f2b
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Variable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class Variable {
+
+ private String name;
+ private String jsonPath;
+
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonPathUtils.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonPathUtils.java
new file mode 100644
index 000000000..175b322a8
--- /dev/null
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonPathUtils.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.utils;
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.common.exception.JsonException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.ReadContext;
+import com.jayway.jsonpath.internal.path.CompiledPath;
+import com.jayway.jsonpath.internal.path.PathCompiler;
+import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
+
+public class JsonPathUtils {
+
+ public static final String JSONPATH_SPLIT = "\\.";
+ public static final String JSONPATH_PREFIX = "$";
+ public static final String JSONPATH_PREFIX_WITH_POINT = "$.";
+ public static final String JSONPATH_DATA = "$.data";
+ private static final ObjectMapper STRICT_OBJECT_MAPPER = new
ObjectMapper();
+
+ private static final Configuration JSON_PATH_CONFIG =
Configuration.builder()
+ .jsonProvider(new JacksonJsonProvider(STRICT_OBJECT_MAPPER))
+ .build();
+
+ public static boolean isEmptyJsonObject(String jsonString) {
+ try {
+ JsonNode jsonNode = STRICT_OBJECT_MAPPER.readTree(jsonString);
+ return jsonNode.isObject() && jsonNode.isEmpty();
+ } catch (Exception e) {
+ throw new JsonException("INVALID_JSON_STRING", e);
+ }
+ }
+
+ public static JsonNode parseStrict(String json) throws JsonException {
+ try {
+ JsonParser parser =
STRICT_OBJECT_MAPPER.getFactory().createParser(json);
+ JsonNode result = STRICT_OBJECT_MAPPER.readTree(parser);
+ if (parser.nextToken() != null) {
+ // Check if there are more tokens after reading the root object
+ throw new JsonException("Additional tokens found after
parsing: " + json);
+ }
+ return result;
+ } catch (Exception e) {
+ throw new JsonException("Json is not valid in strict way: " +
json, e);
+ }
+ }
+
+ public static String buildJsonString(String key, Object value) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(key, value);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ try {
+ return objectMapper.writeValueAsString(map);
+ } catch (JsonProcessingException e) {
+ return null;
+ }
+ }
+
+ public static boolean isValidAndDefinite(String jsonPath) {
+ if (Strings.isNullOrEmpty(jsonPath) ||
!jsonPath.startsWith(JSONPATH_PREFIX)) {
+ return Boolean.FALSE;
+ }
+ CompiledPath compiledPath = null;
+ try {
+ compiledPath = (CompiledPath) PathCompiler.compile(jsonPath);
+ } catch (InvalidPathException e) {
+ return Boolean.FALSE;
+ }
+ return compiledPath.isDefinite();
+ }
+
+ public static String getJsonPathValue(String content, String jsonPath) {
+ if (Strings.isNullOrEmpty(content) || Strings.isNullOrEmpty(jsonPath))
{
+ throw new EventMeshException("invalid config" + jsonPath);
+ }
+ Object obj = null;
+ try {
+ obj =
JsonPath.using(JSON_PATH_CONFIG).parse(content).read(jsonPath, JsonNode.class);
+ } catch (InvalidPathException invalidPathException) {
+ return null;
+ }
+ return obj.toString();
+ }
+
+ public static JsonNode convertToJsonNode(String object) throws
JsonProcessingException {
+ return STRICT_OBJECT_MAPPER.readValue(object, JsonNode.class);
+ }
+
+ public static String matchJsonPathValueWithString(String jsonString,
String jsonPath) {
+ Object obj = jsonPathParse(jsonString, jsonPath);
+
+ if (obj == null) {
+ return "null";
+ }
+
+ return obj.toString();
+ }
+
+ public static Object jsonPathParse(String jsonString, String jsonPath) {
+ if (Strings.isNullOrEmpty(jsonPath) ||
Strings.isNullOrEmpty(jsonString)) {
+ throw new EventMeshException("invalid config" + jsonPath);
+ }
+ Object obj = null;
+ try {
+ final ReadContext readContext =
JsonPath.using(JSON_PATH_CONFIG).parse(jsonString);
+ obj = readContext.read(jsonPath);
+ } catch (InvalidPathException invalidPathException) {
+ return null;
+ }
+ return obj;
+ }
+
+ public static String matchJsonPathValue(String jsonString, String
jsonPath) throws JsonProcessingException {
+ Object obj = jsonPathParse(jsonString, jsonPath);
+ return STRICT_OBJECT_MAPPER.writer().writeValueAsString(obj);
+ }
+}
diff --git
a/eventmesh-common/src/test/java/org/apache/eventmesh/common/filter/PatternTest.java
b/eventmesh-common/src/test/java/org/apache/eventmesh/common/filter/PatternTest.java
new file mode 100644
index 000000000..f5b14515f
--- /dev/null
+++
b/eventmesh-common/src/test/java/org/apache/eventmesh/common/filter/PatternTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter;
+
+import org.apache.eventmesh.common.filter.pattern.Pattern;
+import org.apache.eventmesh.common.filter.patternbuild.PatternBuilder;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class PatternTest {
+
+ private final String event = "{\n"
+ + "\"id\": \"4b26115b-73e-cf74a******\",\n"
+ + " \"specversion\": \"1.0\",\n"
+ + "\"source\": \"eventmesh.source\",\n"
+ + "\"type\": \"object:put\",\n"
+ + "\"datacontenttype\": \"application/json\",\n"
+ + "\"subject\": \"xxx.jpg\",\n"
+ + "\"time\": \"2022-01-17T12:07:48.955Z\",\n"
+ + "\"data\": {\n"
+ + "\"name\": \"test01\",\n"
+ + "\"state\": \"enable\",\n"
+ + "\"num\": 10 ,\n"
+ + "\"num1\": 50.7 \n"
+ + "}\n"
+ + " }";
+
+ @Test
+ public void testSpecifiedFilter() {
+ String condition = "{\n"
+ + " \"source\":[\n"
+ + " {\n"
+ + " \"prefix\":\"eventmesh.\"\n"
+ + " }\n"
+ + " ]\n"
+ + "}";
+ Pattern pattern = PatternBuilder.build(condition);
+ Boolean res = pattern.filter(event);
+ Assertions.assertEquals(true, res);
+ }
+
+ @Test
+ public void testPrefixFilter() {
+ String condition = "{\n"
+ + " \"source\":[\n"
+ + " {\n"
+ + " \"prefix\":\"eventmesh.\"\n"
+ + " }\n"
+ + " ]\n"
+ + "}";
+ Pattern pattern = PatternBuilder.build(condition);
+ Boolean res = pattern.filter(event);
+ Assertions.assertEquals(true, res);
+ }
+
+ @Test
+ public void testSuffixFilter() {
+ String condition = "{\n"
+ + " \"subject\":[\n"
+ + " {\n"
+ + " \"suffix\":\".jpg\"\n"
+ + " }\n"
+ + " ]\n"
+ + "}";
+ Pattern pattern = PatternBuilder.build(condition);
+ Boolean res = pattern.filter(event);
+ Assertions.assertEquals(true, res);
+ }
+
+ @Test
+ public void testNumericFilter() {
+ String condition = "{\n"
+ + " \"data\":{\n"
+ + " \"num\":[\n"
+ + " {\n"
+ + " \"numeric\":[\n"
+ + " \">\",\n"
+ + " 0,\n"
+ + " \"<=\",\n"
+ + " 10\n"
+ + " ]\n"
+ + " }\n"
+ + " ],\n"
+ + " \"num1\":[\n"
+ + " {\n"
+ + " \"numeric\":[\n"
+ + " \"=\",\n"
+ + " 50.7\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + "}";
+ Pattern pattern = PatternBuilder.build(condition);
+ Boolean res = pattern.filter(event);
+ Assertions.assertEquals(true, res);
+ }
+
+ @Test
+ public void testExistsFilter() {
+ String condition = "{\n"
+ + " \"data\":{\n"
+ + " \"state\":[\n"
+ + " {\n"
+ + " \"exists\": false\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + "}";
+ Pattern pattern = PatternBuilder.build(condition);
+ Boolean res = pattern.filter(event);
+ Assertions.assertEquals(false, res);
+ }
+
+ @Test
+ public void testAnythingButFilter() {
+ String condition = "{\n"
+ + " \"data\":{\n"
+ + " \"state\":[\n"
+ + " {\n"
+ + " \"anything-but\": \"enable\"\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + "}";
+ Pattern pattern = PatternBuilder.build(condition);
+ Boolean res = pattern.filter(event);
+ Assertions.assertEquals(false, res);
+ }
+
+}
diff --git
a/eventmesh-common/src/test/java/org/apache/eventmesh/common/transform/TransformTest.java
b/eventmesh-common/src/test/java/org/apache/eventmesh/common/transform/TransformTest.java
new file mode 100644
index 000000000..f91681a42
--- /dev/null
+++
b/eventmesh-common/src/test/java/org/apache/eventmesh/common/transform/TransformTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+public class TransformTest {
+
+ public static final String EVENT = "{\n"
+ + "\"id\": \"5b26115b-73e-cf74a******\",\n"
+ + " \"specversion\": \"1.0\",\n"
+ + "\"source\": \"apache.eventmesh\",\n"
+ + "\"type\": \"object:test\",\n"
+ + "\"datacontenttype\": \"application/json\",\n"
+ + "\"subject\": \"xxx.jpg\",\n"
+ + "\"time\": \"2023-09-17T12:07:48.955Z\",\n"
+ + "\"data\": {\n"
+ + "\"name\": \"test-transformer\",\n"
+ + "\"num\": 100 ,\n"
+ + "\"boolean\": true,\n"
+ + "\"nullV\": null\n"
+ + "}\n"
+ + " }";
+
+ @Test
+ public void testOriginalTransformer() throws JsonProcessingException {
+
+ Transformer transformer = new
TransformerBuilder.Builder(TransformerType.ORIGINAL).build();
+ String output = transformer.transform(EVENT);
+ Assertions.assertEquals(EVENT, output);
+
+ Transformer transformer1 =
TransformerBuilder.buildOriginalTransformer();
+ String output1 = transformer1.transform(EVENT);
+ Assertions.assertEquals(EVENT, output1);
+ }
+
+ @Test
+ public void testConstantTransformer() throws JsonProcessingException {
+ Transformer transformer = new
TransformerBuilder.Builder(TransformerType.CONSTANT).setContent("constant
test").build();
+ String output = transformer.transform(EVENT);
+ Assertions.assertEquals("constant test", output);
+
+ Transformer transformer1 =
TransformerBuilder.buildConstantTransformer("constant test");
+ String output1 = transformer1.transform(EVENT);
+ Assertions.assertEquals("constant test", output1);
+
+ }
+
+ @Test
+ public void testTemplateTransFormerWithStringValue() throws
JsonProcessingException {
+ String content = "{\"data-name\":\"$.data.name\"}";
+ String template = "Transformers test:data name is ${data-name}";
+ Transformer transform =
TransformerBuilder.buildTemplateTransFormer(content, template);
+ String output = transform.transform(EVENT);
+ Assertions.assertEquals("Transformers test:data name is
test-transformer", output);
+
+ Transformer transformer1 = new
TransformerBuilder.Builder(TransformerType.TEMPLATE)
+ .setContent(content)
+ .setTemplate(template).build();
+ String output1 = transformer1.transform(EVENT);
+ Assertions.assertEquals("Transformers test:data name is
test-transformer", output1);
+
+ }
+
+ @Test
+ public void testTemplateTransFormerWithNullContent() throws
JsonProcessingException {
+ String content = "{}";
+ String template = "Transformers test:data num is ${data-num}";
+ Transformer transformer =
TransformerBuilder.buildTemplateTransFormer(content, template);
+ String output = transformer.transform(EVENT);
+ Assertions.assertEquals("Transformers test:data num is ${data-num}",
output);
+ }
+
+ @Test
+ public void testTemplateTransFormerWithNoMatchContent() throws
JsonProcessingException {
+ String extractJson = "{\"data-num\":\"$.data.no\"}";
+ String template = "Transformers test:data num is ${data-num}";
+ Transformer transformer =
TransformerBuilder.buildTemplateTransFormer(extractJson, template);
+ String output = transformer.transform(EVENT);
+ Assertions.assertEquals("Transformers test:data num is null", output);
+ }
+
+ @Test
+ public void testTemplateTransFormerWithMatchNumValue() throws
JsonProcessingException {
+ String extractJson = "{\"data-num\":\"$.data.num\"}";
+ String template = "Transformers test:data num is ${data-num}";
+ Transformer transformer =
TransformerBuilder.buildTemplateTransFormer(extractJson, template);
+ String output = transformer.transform(EVENT);
+ Assertions.assertEquals("Transformers test:data num is 100", output);
+ }
+
+ @Test
+ public void testTemplateTransFormerWithMatchNullValue() throws
JsonProcessingException {
+ String content = "{\"data-null\":\"$.data.nullV\"}";
+ String template = "Transformers test:data null is ${data-null}";
+ Transformer transformer =
TransformerBuilder.buildTemplateTransFormer(content, template);
+ String output = transformer.transform(EVENT);
+ Assertions.assertEquals("Transformers test:data null is null", output);
+ }
+
+ @Test
+ public void testTemplateTransFormerWithMatchBooleanValue() throws
JsonProcessingException {
+ String extractJson = "{\"boolean\":\"$.data.boolean\"}";
+ String template = "Transformers test:data boolean is ${boolean}";
+ Transformer transformer =
TransformerBuilder.buildTemplateTransFormer(extractJson, template);
+ String output = transformer.transform(EVENT);
+ Assertions.assertEquals("Transformers test:data boolean is true",
output);
+ }
+
+ //
+ @Test
+ public void testTemplateTransFormerWithConstant() throws
JsonProcessingException {
+ String extractJson =
"{\"name\":\"$.data.name\",\"constant\":\"constant\"" + "}";
+ String template = "Transformers test:data name is ${name}, constant is
${constant}";
+ Transformer transformer =
TransformerBuilder.buildTemplateTransFormer(extractJson, template);
+ String output = transformer.transform(EVENT);
+ Assertions.assertEquals("Transformers test:data name is
test-transformer, constant is constant",
+ output);
+ }
+
+}
diff --git a/tools/dependency-check/known-dependencies.txt
b/tools/dependency-check/known-dependencies.txt
index 7dd456ae7..6022c3fbd 100644
--- a/tools/dependency-check/known-dependencies.txt
+++ b/tools/dependency-check/known-dependencies.txt
@@ -1,7 +1,9 @@
+accessors-smart-2.4.7.jar
amqp-client-5.16.0.jar
animal-sniffer-annotations-1.19.jar
annotations-4.1.1.4.jar
aopalliance-1.0.jar
+asm-9.1.jar
asm-9.2.jar
asm-analysis-9.2.jar
asm-commons-9.2.jar
@@ -36,6 +38,7 @@ commons-digester-2.1.jar
commons-io-2.11.0.jar
commons-lang3-3.6.jar
commons-logging-1.2.jar
+commons-net-3.9.0.jar
commons-text-1.9.jar
commons-validator-1.7.jar
consul-api-1.4.5.jar
@@ -92,6 +95,8 @@ jjwt-jackson-0.11.1.jar
jna-4.2.2.jar
jodd-bean-5.1.6.jar
jodd-core-5.1.6.jar
+json-path-2.7.0.jar
+json-smart-2.4.7.jar
jsr305-3.0.2.jar
kafka-clients-3.0.0.jar
listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]