This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new a717d3503 [WIP][ISSUE #4043]initial implementation of filter and 
transform (#4365)
a717d3503 is described below

commit a717d3503af42de7c8757cbc4627caa71a9ee23a
Author: FanYang <[email protected]>
AuthorDate: Wed Oct 25 16:17:07 2023 +0800

    [WIP][ISSUE #4043]initial implementation of filter and transform (#4365)
    
    * initial implementation -informal
    
    * initial implementation -informal
    
    * fix code style problem
    
    * remove unnecessary code snippets.
    
    * add license header
    
    * add some dep
    
    * fix JsonPathUtils checkstyle problem
    
    * fix test class checkstyle problem
    
    * fix CI problem
---
 eventmesh-common/build.gradle                      |   5 +
 .../eventmesh/common/filter/PatternEntry.java      |  68 ++++++++
 .../filter/condition/AnythingButCondition.java     |  71 ++++++++
 .../common/filter/condition/Condition.java         |  29 ++++
 .../common/filter/condition/ConditionsBuilder.java |  65 +++++++
 .../common/filter/condition/ExistsCondition.java   |  35 ++++
 .../common/filter/condition/NumericCondition.java  |  80 +++++++++
 .../common/filter/condition/PrefixxCondition.java  |  34 ++++
 .../filter/condition/SpecifiedCondition.java       |  34 ++++
 .../common/filter/condition/SuffixCondition.java   |  34 ++++
 .../eventmesh/common/filter/pattern/Pattern.java   |  87 ++++++++++
 .../common/filter/patternbuild/PatternBuilder.java | 189 +++++++++++++++++++++
 .../common/transform/ConstantTransformer.java      |  34 ++++
 .../eventmesh/common/transform/JsonPathParser.java |  88 ++++++++++
 .../common/transform/OriginalTransformer.java      |  26 +++
 .../eventmesh/common/transform/Template.java       |  47 +++++
 .../common/transform/TemplateTransformer.java      |  44 +++++
 .../eventmesh/common/transform/Transformer.java    |  32 ++++
 .../common/transform/TransformerBuilder.java       |  73 ++++++++
 .../common/transform/TransformerType.java          |  24 +++
 .../eventmesh/common/transform/Variable.java       |  30 ++++
 .../eventmesh/common/utils/JsonPathUtils.java      | 144 ++++++++++++++++
 .../eventmesh/common/filter/PatternTest.java       | 147 ++++++++++++++++
 .../eventmesh/common/transform/TransformTest.java  | 139 +++++++++++++++
 tools/dependency-check/known-dependencies.txt      |   5 +
 25 files changed, 1564 insertions(+)

diff --git a/eventmesh-common/build.gradle b/eventmesh-common/build.gradle
index 8fd6dbf1e..f2c627acd 100644
--- a/eventmesh-common/build.gradle
+++ b/eventmesh-common/build.gradle
@@ -25,6 +25,11 @@ dependencies {
     api "org.apache.commons:commons-text"
     api "org.apache.commons:commons-lang3"
 
+    implementation group: 'com.jayway.jsonpath', name: 'json-path', version: 
'2.7.0'
+    implementation 'commons-net:commons-net:3.9.0'
+    api "io.cloudevents:cloudevents-core"
+    api "io.cloudevents:cloudevents-json-jackson"
+
     implementation "org.apache.logging.log4j:log4j-api"
     implementation "org.apache.logging.log4j:log4j-core"
     implementation "org.apache.logging.log4j:log4j-slf4j-impl"
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/PatternEntry.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/PatternEntry.java
new file mode 100644
index 000000000..b6f747e67
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/PatternEntry.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter;
+
+import org.apache.eventmesh.common.filter.condition.Condition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class PatternEntry {
+
+    private final String patternPath;
+
+    private final List<Condition> conditionList = new ArrayList<>();
+
+    public PatternEntry(final String patternPath) {
+        this.patternPath = patternPath;
+    }
+
+    public void addRuleCondition(Condition patternCondition) {
+        this.conditionList.add(patternCondition);
+    }
+
+    public String getPatternName() {
+        return "123";
+    }
+
+    public String getPatternPath() {
+        return patternPath;
+    }
+
+    public boolean match(JsonNode jsonElement) {
+        for (final Condition patternCondition : conditionList) {
+            if (patternCondition.match(jsonElement)) {
+                return true;
+            }
+        }
+
+        return false;
+
+    }
+
+    /**
+     * Returns the condition list for test only
+     *
+     * @return the condition list
+     */
+    List<Condition> getConditionList() {
+        return conditionList;
+    }
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/AnythingButCondition.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/AnythingButCondition.java
new file mode 100644
index 000000000..f64dd1fc4
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/AnythingButCondition.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.condition;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+class AnythingButCondition implements Condition {
+
+    private List<Condition> conditionList = new ArrayList<>();
+
+    AnythingButCondition(JsonNode condition) {
+
+        if (condition.isValueNode()) {
+            this.conditionList.add(new SpecifiedCondition(condition));
+        }
+        // []
+        if (condition.isArray()) {
+            for (JsonNode element : condition) {
+                this.conditionList.add(new SpecifiedCondition(element));
+            }
+        }
+
+        // prefix,suffix
+        if (condition.isObject()) {
+            Iterator<Entry<String, JsonNode>> iterator = condition.fields();
+            while (iterator.hasNext()) {
+                Entry<String, JsonNode> entry = iterator.next();
+                String key = entry.getKey();
+                JsonNode value = entry.getValue();
+                Condition condition1 = new 
ConditionsBuilder().withKey(key).withParams(value).build();
+                this.conditionList.add(condition1);
+            }
+        }
+
+    }
+
+    @Override
+    public boolean match(JsonNode inputEvent) {
+        if (inputEvent == null) {
+            return false;
+        }
+
+        for (Condition condition : conditionList) {
+            if (condition.match(inputEvent)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/Condition.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/Condition.java
new file mode 100644
index 000000000..4ae08616b
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/Condition.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.condition;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ *
+ */
+public interface Condition {
+
+    boolean match(JsonNode inputEvent);
+
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/ConditionsBuilder.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/ConditionsBuilder.java
new file mode 100644
index 000000000..bceb09d57
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/ConditionsBuilder.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.condition;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class ConditionsBuilder {
+
+    private String key;
+    private JsonNode jsonNode;
+
+    public ConditionsBuilder withKey(String key) {
+        this.key = key;
+        return this;
+    }
+
+    public ConditionsBuilder withParams(JsonNode jsonNode) {
+        this.jsonNode = jsonNode;
+        return this;
+    }
+
+    public Condition build() {
+        Condition condition = null;
+        switch (this.key) {
+            case "prefix":
+                condition = new PrefixxCondition(this.jsonNode);
+                break;
+            case "suffix":
+                condition = new SuffixCondition(this.jsonNode);
+                break;
+            case "anything-but":
+                condition = new AnythingButCondition(this.jsonNode);
+                break;
+            case "numeric":
+                condition = new NumericCondition(this.jsonNode);
+                break;
+            case "exists":
+                condition = new ExistsCondition(this.jsonNode);
+                break;
+            case "specified":
+                condition = new SpecifiedCondition(this.jsonNode);
+                break;
+            default:
+                throw new RuntimeException("INVALID CONDITION");
+            // Add cases for other keys and conditions
+        }
+
+        return condition;
+    }
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/ExistsCondition.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/ExistsCondition.java
new file mode 100644
index 000000000..95d765d71
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/ExistsCondition.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.condition;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+class ExistsCondition implements Condition {
+
+    private JsonNode exists;
+
+    ExistsCondition(JsonNode exists) {
+        this.exists = exists;
+    }
+
+    @Override
+    public boolean match(JsonNode inputEvent) {
+
+        return this.exists.asBoolean() == (inputEvent != null);
+    }
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/NumericCondition.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/NumericCondition.java
new file mode 100644
index 000000000..b5c28a4d1
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/NumericCondition.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.condition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+class NumericCondition implements Condition {
+
+    List<String> operators = new ArrayList<>();
+    List<Double> nums = new ArrayList<>();
+
+    NumericCondition(JsonNode numeric) {
+        if (numeric.isArray()) {
+            if (numeric.size() % 2 != 0) {
+                throw new RuntimeException("NUMERIC NO RIGHT FORMAT");
+            }
+            for (int i = 0; i < numeric.size(); i += 2) {
+                JsonNode opt = numeric.get(i);
+                JsonNode number = numeric.get(i + 1);
+                operators.add(opt.asText());
+                nums.add(number.asDouble());
+            }
+        } else {
+            throw new RuntimeException("NUMERIC MUST BE ARRAY");
+        }
+    }
+
+    private boolean compareNums(Double rule, Double target, String opt) {
+        int res = Double.compare(target, rule);
+        switch (opt) {
+            case "=":
+                return res == 0;
+            case "!=":
+                return res != 0;
+            case ">":
+                return res > 0;
+            case ">=":
+                return res >= 0;
+            case "<":
+                return res < 0;
+            case "<=":
+                return res <= 0;
+            default: // Never be here
+                return false;
+        }
+    }
+
+    @Override
+    public boolean match(JsonNode inputEvent) {
+        if (inputEvent.isNumber()) {
+            for (int i = 0; i < operators.size(); ++i) {
+                if (!compareNums(nums.get(i), inputEvent.asDouble(), 
operators.get(i))) {
+                    return false;
+                }
+
+            }
+
+        }
+
+        return true;
+    }
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/PrefixxCondition.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/PrefixxCondition.java
new file mode 100644
index 000000000..22c1afa61
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/PrefixxCondition.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.condition;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+class PrefixxCondition implements Condition {
+
+    private final String prefix;
+
+    public PrefixxCondition(JsonNode suffix) {
+        this.prefix = suffix.asText();
+    }
+
+    @Override
+    public boolean match(JsonNode inputEvent) {
+        return inputEvent.asText().startsWith(prefix);
+    }
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/SpecifiedCondition.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/SpecifiedCondition.java
new file mode 100644
index 000000000..b6fa884a1
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/SpecifiedCondition.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.condition;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+class SpecifiedCondition implements Condition {
+
+    private final JsonNode specified;
+
+    SpecifiedCondition(JsonNode jsonNode) {
+        specified = jsonNode;
+    }
+
+    @Override
+    public boolean match(JsonNode inputEvent) {
+        return inputEvent.asText().equals(specified.asText());
+    }
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/SuffixCondition.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/SuffixCondition.java
new file mode 100644
index 000000000..b7187d6c1
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/condition/SuffixCondition.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.condition;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+class SuffixCondition implements Condition {
+
+    private final String suffix;
+
+    public SuffixCondition(JsonNode suffix) {
+        this.suffix = suffix.asText();
+    }
+
+    @Override
+    public boolean match(JsonNode inputEvent) {
+        return inputEvent.asText().endsWith(this.suffix);
+    }
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/pattern/Pattern.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/pattern/Pattern.java
new file mode 100644
index 000000000..6187a668b
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/pattern/Pattern.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.pattern;
+
+import org.apache.eventmesh.common.filter.PatternEntry;
+import org.apache.eventmesh.common.utils.JsonPathUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.jayway.jsonpath.PathNotFoundException;
+
+public class Pattern {
+
+    private List<PatternEntry> requiredFieldList = new ArrayList<>();
+    private List<PatternEntry> dataList = new ArrayList<>();
+
+    private String content;
+
+    public void addRequiredFieldList(PatternEntry patternEntry) {
+        this.requiredFieldList.add(patternEntry);
+    }
+
+    public void addDataList(PatternEntry patternEntry) {
+        this.dataList.add(patternEntry);
+    }
+
+    public boolean filter(String content) {
+        this.content = content;
+        // this.jsonNode = JacksonUtils.STRING_TO_JSONNODE(content);
+
+        return matchRequiredFieldList(requiredFieldList) && 
matchRequiredFieldList(dataList);
+    }
+
+    private boolean matchRequiredFieldList(List<PatternEntry> dataList) {
+
+        for (final PatternEntry patternEntry : dataList) {
+            JsonNode jsonElement = null;
+            try {
+                // content:filter
+                String matchRes = 
JsonPathUtils.matchJsonPathValue(this.content, patternEntry.getPatternPath());
+
+                if (StringUtils.isNoneBlank(matchRes)) {
+                    jsonElement = JsonPathUtils.parseStrict(matchRes);
+                }
+
+                if (jsonElement != null && jsonElement.isArray()) {
+                    for (JsonNode element : jsonElement) {
+                        if (patternEntry.match(element)) {
+                            return true;
+                        }
+                    }
+                } else {
+                    if (!patternEntry.match(jsonElement)) {
+                        return false;
+                    }
+                }
+
+            } catch (PathNotFoundException | JsonProcessingException e) {
+                throw new RuntimeException(e);
+            }
+
+        }
+        return true;
+
+    }
+
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/patternbuild/PatternBuilder.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/patternbuild/PatternBuilder.java
new file mode 100644
index 000000000..4abf2af06
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/filter/patternbuild/PatternBuilder.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter.patternbuild;
+
+import org.apache.eventmesh.common.exception.JsonException;
+import org.apache.eventmesh.common.filter.PatternEntry;
+import org.apache.eventmesh.common.filter.condition.Condition;
+import org.apache.eventmesh.common.filter.condition.ConditionsBuilder;
+import org.apache.eventmesh.common.filter.pattern.Pattern;
+
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+
+import io.cloudevents.SpecVersion;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class PatternBuilder {
+
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    public static Pattern build(String jsonStr) {
+
+        Pattern pattern = new Pattern();
+        JsonNode jsonNode = null;
+        try {
+            jsonNode = mapper.readTree(jsonStr);
+        } catch (Exception e) {
+            throw new JsonException("INVALID_JSON_STRING", e);
+        }
+
+        if (jsonNode.isEmpty() || !jsonNode.isObject()) {
+            return null;
+        }
+
+        // iter all json data
+        Iterator<Entry<String, JsonNode>> iterator = jsonNode.fields();
+        while (iterator.hasNext()) {
+            Map.Entry<String, JsonNode> entry = iterator.next();
+            String key = entry.getKey();
+            JsonNode value = entry.getValue();
+
+            if (key.equals("data")) {
+
+                parseDataField(value, pattern);
+                continue;
+            }
+
+            if (!value.isArray()) {
+                throw new JsonException("INVALID_JSON_STRING");
+            }
+
+            if (!SpecVersion.V1.getAllAttributes().contains(key)) {
+                throw new JsonException("INVALID_JSON_KEY");
+            }
+
+            // iter all requiredField
+            parseRequiredField("$." + key, value, pattern);
+            if (value.isEmpty()) {
+                // Empty array
+                throw new JsonException("INVALID_PATTERN_VALUE ");
+            }
+            PatternEntry patternEntry = new PatternEntry("$." + key);
+            for (final JsonNode objNode : value) {
+                // {
+                // "suffix":".jpg"
+                // }
+                Condition condition = parseCondition(objNode);
+                patternEntry.addRuleCondition(condition);
+            }
+
+            pattern.addRequiredFieldList(patternEntry);
+        }
+
+        return pattern;
+
+    }
+
+    private static void parseDataField(JsonNode jsonNode, Pattern pattern) {
+        if (!jsonNode.isObject()) {
+            throw new JsonException("INVALID_JSON_KEY");
+        }
+        Queue<Node> queueNode = new ArrayDeque<>();
+        Node node = new Node("$.data", "data", jsonNode);
+        queueNode.add(node);
+        while (!queueNode.isEmpty()) {
+            Node ele = queueNode.poll();
+            String elepath = ele.getPath();
+            Iterator<Map.Entry<String, JsonNode>> iterator = 
ele.getValue().fields();
+            while (iterator.hasNext()) {
+                Map.Entry<String, JsonNode> entry = iterator.next();
+                // state
+                String key = entry.getKey();
+                // [{"anything-but":"initializing"}] [{"anything-but":123}]}
+                JsonNode value = entry.getValue();
+                PatternEntry patternEntry = new PatternEntry(elepath + "." + 
key);
+                if (!value.isObject()) {
+                    if (value.isArray()) {
+                        for (JsonNode node11 : value) {
+                            // {"anything-but":"initializing"}
+                            
patternEntry.addRuleCondition(parseCondition(node11));
+                        }
+                    }
+                    pattern.addDataList(patternEntry);
+                } else {
+                    queueNode.add(new Node(elepath + "." + key, key, value));
+                }
+            }
+        }
+
+    }
+
+    private static Condition parseCondition(JsonNode jsonNode) {
+        if (jsonNode.isValueNode()) {
+            return new 
ConditionsBuilder().withKey("specified").withParams(jsonNode).build();
+        }
+
+        Iterator<Map.Entry<String, JsonNode>> iterator = jsonNode.fields();
+        while (iterator.hasNext()) {
+            Map.Entry<String, JsonNode> entry = iterator.next();
+            // "anything-but"
+            String key = entry.getKey();
+            // "initializing"
+            JsonNode value = entry.getValue();
+            return new 
ConditionsBuilder().withKey(key).withParams(value).build();
+        }
+        return null;
+    }
+
+    private static void parseRequiredField(String path, JsonNode jsonNode, 
Pattern pattern) {
+        if (jsonNode.isEmpty()) {
+            // Empty array
+            throw new JsonException("INVALID_PATTERN_VALUE ");
+        }
+        PatternEntry patternEntry = new PatternEntry(path);
+        for (final JsonNode objNode : jsonNode) {
+            Condition condition = parseCondition(objNode);
+            patternEntry.addRuleCondition(condition);
+        }
+
+        pattern.addRequiredFieldList(patternEntry);
+
+    }
+
+    static class Node {
+
+        private String path;
+        private String key;
+        private JsonNode value;
+
+        Node(String path, String key, JsonNode value) {
+            this.path = path;
+            this.key = key;
+            this.value = value;
+        }
+
+        String getPath() {
+            return this.path;
+        }
+
+        String getKey() {
+            return this.key;
+        }
+
+        JsonNode getValue() {
+            return this.value;
+        }
+    }
+
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/ConstantTransformer.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/ConstantTransformer.java
new file mode 100644
index 000000000..e3df38e1e
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/ConstantTransformer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+
+class ConstantTransformer implements Transformer {
+
+    private final String jsonpath;
+
+    ConstantTransformer(String jsonpath) {
+        this.jsonpath = jsonpath;
+    }
+
+    @Override
+    public String transform(String json) throws EventMeshException {
+        return this.jsonpath;
+    }
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/JsonPathParser.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/JsonPathParser.java
new file mode 100644
index 000000000..d35a22205
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/JsonPathParser.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.common.utils.JsonPathUtils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class JsonPathParser {
+
+    protected List<Variable> variablesList = new ArrayList<>();
+
+    public List<Variable> getVariablesList() {
+        return variablesList;
+    }
+
+    /**
+     * parser input jsonpath string into variable list
+     *
+     * @param jsonPathString
+     */
+    public JsonPathParser(String jsonPathString) {
+        JsonNode jsonObject = JsonPathUtils.parseStrict(jsonPathString);
+        Iterator<Map.Entry<String, JsonNode>> fields = jsonObject.fields();
+
+        while (fields.hasNext()) {
+            Map.Entry<String, JsonNode> entry = fields.next();
+            String name = entry.getKey();
+            JsonNode valueNode = entry.getValue();
+            if (valueNode.isValueNode()) {
+                variablesList.add(new Variable(name, valueNode.asText()));
+            } else {
+                throw new EventMeshException("invalid config:" + 
jsonPathString);
+            }
+
+        }
+
+    }
+
+    /**
+     * use jsonpath to match json and return result
+     *
+     * @param json
+     * @return
+     */
+
+    public List<Variable> match(String json) throws JsonProcessingException {
+
+        if (json == null || json.isEmpty()) {
+            return new ArrayList<>();
+        }
+
+        List<Variable> variableList = new ArrayList<>(variablesList.size());
+        for (Variable element : variablesList) {
+            if (JsonPathUtils.isValidAndDefinite(element.getJsonPath())) {
+                String res = JsonPathUtils.matchJsonPathValueWithString(json, 
element.getJsonPath());
+                Variable variable = new Variable(element.getName(), res);
+                variableList.add(variable);
+            } else {
+                variableList.add(element);
+            }
+
+        }
+        return variableList;
+    }
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/OriginalTransformer.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/OriginalTransformer.java
new file mode 100644
index 000000000..708a2aeb2
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/OriginalTransformer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+class OriginalTransformer implements Transformer {
+
+    @Override
+    public String transform(String json) {
+        return json;
+    }
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Template.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Template.java
new file mode 100644
index 000000000..348ed1a93
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Template.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+
+import org.apache.commons.text.StringSubstitutor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class Template {
+
+    private String template;
+
+    public String substitute(List<Variable> variables) throws 
EventMeshException {
+
+        Map<String, String> valuesMap = variables.stream()
+            .filter(variable -> variable.getJsonPath() != null)
+            .collect(Collectors.toMap(Variable::getName, 
Variable::getJsonPath));
+        StringSubstitutor sub = new StringSubstitutor(valuesMap);
+
+        return sub.replace(template);
+
+    }
+}
\ No newline at end of file
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TemplateTransformer.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TemplateTransformer.java
new file mode 100644
index 000000000..ea42d05de
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TemplateTransformer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+import java.util.List;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+class TemplateTransformer implements Transformer {
+
+    private final JsonPathParser jsonPathParser;
+
+    private final Template template;
+
+    TemplateTransformer(JsonPathParser jsonPathParser, Template template) {
+        this.template = template;
+        this.jsonPathParser = jsonPathParser;
+    }
+
+    @Override
+    public String transform(String json) throws JsonProcessingException {
+        // 1: get variable match results
+        List<Variable> variableList = jsonPathParser.match(json);
+        // 2: use results replace template
+        String res = template.substitute(variableList);
+        return res;
+    }
+
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Transformer.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Transformer.java
new file mode 100644
index 000000000..fecfe4b18
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Transformer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+/**
+ * EventMesh transformer interface, specified transformer implementation 
includes:
+ * 1. Constant
+ * 2. Original
+ * 3. Template
+ */
+public interface Transformer {
+
+    String transform(String json) throws JsonProcessingException;
+
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TransformerBuilder.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TransformerBuilder.java
new file mode 100644
index 000000000..0eafdebb7
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TransformerBuilder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+
+public class TransformerBuilder {
+
+    public static final class Builder {
+
+        private final TransformerType transformerType;
+        private String template;
+        private String content;
+
+        public Builder(TransformerType transformerType) {
+            this.transformerType = transformerType;
+        }
+
+        public Builder setContent(String content) {
+            this.content = content;
+            return this;
+        }
+
+        public Builder setTemplate(String template) {
+            this.template = template;
+            return this;
+        }
+
+        public Transformer build() {
+            switch (this.transformerType) {
+                case CONSTANT:
+                    return buildConstantTransformer(this.content);
+                case ORIGINAL:
+                    return buildOriginalTransformer();
+                case TEMPLATE:
+                    return buildTemplateTransFormer(this.content, 
this.template);
+                default:
+                    throw new EventMeshException("invalid config");
+            }
+        }
+
+    }
+
+    public static Transformer buildTemplateTransFormer(String jsonContent, 
String template) {
+        JsonPathParser jsonPathParser = new JsonPathParser(jsonContent);
+        Template templateEntry = new Template(template);
+        return new TemplateTransformer(jsonPathParser, templateEntry);
+    }
+
+    public static Transformer buildConstantTransformer(String constant) {
+        return new ConstantTransformer(constant);
+    }
+
+    public static Transformer buildOriginalTransformer() {
+        return new OriginalTransformer();
+    }
+
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TransformerType.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TransformerType.java
new file mode 100644
index 000000000..13a5c05d4
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/TransformerType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+public enum TransformerType {
+    ORIGINAL,
+    CONSTANT,
+    TEMPLATE
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Variable.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Variable.java
new file mode 100644
index 000000000..b7f139f2b
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/transform/Variable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class Variable {
+
+    private String name;
+    private String jsonPath;
+
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonPathUtils.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonPathUtils.java
new file mode 100644
index 000000000..175b322a8
--- /dev/null
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/JsonPathUtils.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.utils;
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.common.exception.JsonException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.InvalidPathException;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.ReadContext;
+import com.jayway.jsonpath.internal.path.CompiledPath;
+import com.jayway.jsonpath.internal.path.PathCompiler;
+import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
+
+public class JsonPathUtils {
+
+    public static final String JSONPATH_SPLIT = "\\.";
+    public static final String JSONPATH_PREFIX = "$";
+    public static final String JSONPATH_PREFIX_WITH_POINT = "$.";
+    public static final String JSONPATH_DATA = "$.data";
+    private static final ObjectMapper STRICT_OBJECT_MAPPER = new 
ObjectMapper();
+
+    private static final Configuration JSON_PATH_CONFIG = 
Configuration.builder()
+        .jsonProvider(new JacksonJsonProvider(STRICT_OBJECT_MAPPER))
+        .build();
+
+    public static boolean isEmptyJsonObject(String jsonString) {
+        try {
+            JsonNode jsonNode = STRICT_OBJECT_MAPPER.readTree(jsonString);
+            return jsonNode.isObject() && jsonNode.isEmpty();
+        } catch (Exception e) {
+            throw new JsonException("INVALID_JSON_STRING", e);
+        }
+    }
+
+    public static JsonNode parseStrict(String json) throws JsonException {
+        try {
+            JsonParser parser = 
STRICT_OBJECT_MAPPER.getFactory().createParser(json);
+            JsonNode result = STRICT_OBJECT_MAPPER.readTree(parser);
+            if (parser.nextToken() != null) {
+                // Check if there are more tokens after reading the root object
+                throw new JsonException("Additional tokens found after 
parsing: " + json);
+            }
+            return result;
+        } catch (Exception e) {
+            throw new JsonException("Json is not valid in strict way: " + 
json, e);
+        }
+    }
+
+    public static String buildJsonString(String key, Object value) {
+        Map<String, Object> map = new HashMap<>();
+        map.put(key, value);
+
+        ObjectMapper objectMapper = new ObjectMapper();
+        try {
+            return objectMapper.writeValueAsString(map);
+        } catch (JsonProcessingException e) {
+            return null;
+        }
+    }
+
+    public static boolean isValidAndDefinite(String jsonPath) {
+        if (Strings.isNullOrEmpty(jsonPath) || 
!jsonPath.startsWith(JSONPATH_PREFIX)) {
+            return Boolean.FALSE;
+        }
+        CompiledPath compiledPath = null;
+        try {
+            compiledPath = (CompiledPath) PathCompiler.compile(jsonPath);
+        } catch (InvalidPathException e) {
+            return Boolean.FALSE;
+        }
+        return compiledPath.isDefinite();
+    }
+
+    public static String getJsonPathValue(String content, String jsonPath) {
+        if (Strings.isNullOrEmpty(content) || Strings.isNullOrEmpty(jsonPath)) 
{
+            throw new EventMeshException("invalid config" + jsonPath);
+        }
+        Object obj = null;
+        try {
+            obj = 
JsonPath.using(JSON_PATH_CONFIG).parse(content).read(jsonPath, JsonNode.class);
+        } catch (InvalidPathException invalidPathException) {
+            return null;
+        }
+        return obj.toString();
+    }
+
+    public static JsonNode convertToJsonNode(String object) throws 
JsonProcessingException {
+        return STRICT_OBJECT_MAPPER.readValue(object, JsonNode.class);
+    }
+
+    public static String matchJsonPathValueWithString(String jsonString, 
String jsonPath) {
+        Object obj = jsonPathParse(jsonString, jsonPath);
+
+        if (obj == null) {
+            return "null";
+        }
+
+        return obj.toString();
+    }
+
+    public static Object jsonPathParse(String jsonString, String jsonPath) {
+        if (Strings.isNullOrEmpty(jsonPath) || 
Strings.isNullOrEmpty(jsonString)) {
+            throw new EventMeshException("invalid config" + jsonPath);
+        }
+        Object obj = null;
+        try {
+            final ReadContext readContext = 
JsonPath.using(JSON_PATH_CONFIG).parse(jsonString);
+            obj = readContext.read(jsonPath);
+        } catch (InvalidPathException invalidPathException) {
+            return null;
+        }
+        return obj;
+    }
+
+    public static String matchJsonPathValue(String jsonString, String 
jsonPath) throws JsonProcessingException {
+        Object obj = jsonPathParse(jsonString, jsonPath);
+        return STRICT_OBJECT_MAPPER.writer().writeValueAsString(obj);
+    }
+}
diff --git 
a/eventmesh-common/src/test/java/org/apache/eventmesh/common/filter/PatternTest.java
 
b/eventmesh-common/src/test/java/org/apache/eventmesh/common/filter/PatternTest.java
new file mode 100644
index 000000000..f5b14515f
--- /dev/null
+++ 
b/eventmesh-common/src/test/java/org/apache/eventmesh/common/filter/PatternTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.filter;
+
+import org.apache.eventmesh.common.filter.pattern.Pattern;
+import org.apache.eventmesh.common.filter.patternbuild.PatternBuilder;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class PatternTest {
+
+    private final String event = "{\n"
+        + "\"id\": \"4b26115b-73e-cf74a******\",\n"
+        + "        \"specversion\": \"1.0\",\n"
+        + "\"source\": \"eventmesh.source\",\n"
+        + "\"type\": \"object:put\",\n"
+        + "\"datacontenttype\": \"application/json\",\n"
+        + "\"subject\": \"xxx.jpg\",\n"
+        + "\"time\": \"2022-01-17T12:07:48.955Z\",\n"
+        + "\"data\": {\n"
+        + "\"name\": \"test01\",\n"
+        + "\"state\": \"enable\",\n"
+        + "\"num\": 10 ,\n"
+        + "\"num1\": 50.7 \n"
+        + "}\n"
+        + "    }";
+
+    @Test
+    public void testSpecifiedFilter() {
+        String condition = "{\n"
+            + "    \"source\":[\n"
+            + "        {\n"
+            + "            \"prefix\":\"eventmesh.\"\n"
+            + "        }\n"
+            + "    ]\n"
+            + "}";
+        Pattern pattern = PatternBuilder.build(condition);
+        Boolean res = pattern.filter(event);
+        Assertions.assertEquals(true, res);
+    }
+
+    @Test
+    public void testPrefixFilter() {
+        String condition = "{\n"
+            + "    \"source\":[\n"
+            + "        {\n"
+            + "            \"prefix\":\"eventmesh.\"\n"
+            + "        }\n"
+            + "    ]\n"
+            + "}";
+        Pattern pattern = PatternBuilder.build(condition);
+        Boolean res = pattern.filter(event);
+        Assertions.assertEquals(true, res);
+    }
+
+    @Test
+    public void testSuffixFilter() {
+        String condition = "{\n"
+            + "    \"subject\":[\n"
+            + "        {\n"
+            + "            \"suffix\":\".jpg\"\n"
+            + "        }\n"
+            + "    ]\n"
+            + "}";
+        Pattern pattern = PatternBuilder.build(condition);
+        Boolean res = pattern.filter(event);
+        Assertions.assertEquals(true, res);
+    }
+
+    @Test
+    public void testNumericFilter() {
+        String condition = "{\n"
+            + "    \"data\":{\n"
+            + "        \"num\":[\n"
+            + "            {\n"
+            + "                \"numeric\":[\n"
+            + "                    \">\",\n"
+            + "                    0,\n"
+            + "                    \"<=\",\n"
+            + "                    10\n"
+            + "                ]\n"
+            + "            }\n"
+            + "        ],\n"
+            + "        \"num1\":[\n"
+            + "            {\n"
+            + "                \"numeric\":[\n"
+            + "                    \"=\",\n"
+            + "                    50.7\n"
+            + "                ]\n"
+            + "            }\n"
+            + "        ]\n"
+            + "    }\n"
+            + "}";
+        Pattern pattern = PatternBuilder.build(condition);
+        Boolean res = pattern.filter(event);
+        Assertions.assertEquals(true, res);
+    }
+
+    @Test
+    public void testExistsFilter() {
+        String condition = "{\n"
+            + "    \"data\":{\n"
+            + "        \"state\":[\n"
+            + "            {\n"
+            + "               \"exists\": false\n"
+            + "            }\n"
+            + "         ]\n"
+            + "    }\n"
+            + "}";
+        Pattern pattern = PatternBuilder.build(condition);
+        Boolean res = pattern.filter(event);
+        Assertions.assertEquals(false, res);
+    }
+
+    @Test
+    public void testAnythingButFilter() {
+        String condition = "{\n"
+            + "    \"data\":{\n"
+            + "        \"state\":[\n"
+            + "            {\n"
+            + "               \"anything-but\": \"enable\"\n"
+            + "            }\n"
+            + "         ]\n"
+            + "    }\n"
+            + "}";
+        Pattern pattern = PatternBuilder.build(condition);
+        Boolean res = pattern.filter(event);
+        Assertions.assertEquals(false, res);
+    }
+
+}
diff --git 
a/eventmesh-common/src/test/java/org/apache/eventmesh/common/transform/TransformTest.java
 
b/eventmesh-common/src/test/java/org/apache/eventmesh/common/transform/TransformTest.java
new file mode 100644
index 000000000..f91681a42
--- /dev/null
+++ 
b/eventmesh-common/src/test/java/org/apache/eventmesh/common/transform/TransformTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.transform;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+public class TransformTest {
+
+    public static final String EVENT = "{\n"
+        + "\"id\": \"5b26115b-73e-cf74a******\",\n"
+        + "      \"specversion\": \"1.0\",\n"
+        + "\"source\": \"apache.eventmesh\",\n"
+        + "\"type\": \"object:test\",\n"
+        + "\"datacontenttype\": \"application/json\",\n"
+        + "\"subject\": \"xxx.jpg\",\n"
+        + "\"time\": \"2023-09-17T12:07:48.955Z\",\n"
+        + "\"data\": {\n"
+        + "\"name\": \"test-transformer\",\n"
+        + "\"num\": 100  ,\n"
+        + "\"boolean\": true,\n"
+        + "\"nullV\": null\n"
+        + "}\n"
+        + "    }";
+
+    @Test
+    public void testOriginalTransformer() throws JsonProcessingException {
+
+        Transformer transformer = new 
TransformerBuilder.Builder(TransformerType.ORIGINAL).build();
+        String output = transformer.transform(EVENT);
+        Assertions.assertEquals(EVENT, output);
+
+        Transformer transformer1 = 
TransformerBuilder.buildOriginalTransformer();
+        String output1 = transformer1.transform(EVENT);
+        Assertions.assertEquals(EVENT, output1);
+    }
+
+    @Test
+    public void testConstantTransformer() throws JsonProcessingException {
+        Transformer transformer = new 
TransformerBuilder.Builder(TransformerType.CONSTANT).setContent("constant 
test").build();
+        String output = transformer.transform(EVENT);
+        Assertions.assertEquals("constant test", output);
+
+        Transformer transformer1 = 
TransformerBuilder.buildConstantTransformer("constant test");
+        String output1 = transformer1.transform(EVENT);
+        Assertions.assertEquals("constant test", output1);
+
+    }
+
+    @Test
+    public void testTemplateTransFormerWithStringValue() throws 
JsonProcessingException {
+        String content = "{\"data-name\":\"$.data.name\"}";
+        String template = "Transformers test:data name is ${data-name}";
+        Transformer transform = 
TransformerBuilder.buildTemplateTransFormer(content, template);
+        String output = transform.transform(EVENT);
+        Assertions.assertEquals("Transformers test:data name is 
test-transformer", output);
+
+        Transformer transformer1 = new 
TransformerBuilder.Builder(TransformerType.TEMPLATE)
+            .setContent(content)
+            .setTemplate(template).build();
+        String output1 = transformer1.transform(EVENT);
+        Assertions.assertEquals("Transformers test:data name is 
test-transformer", output1);
+
+    }
+
+    @Test
+    public void testTemplateTransFormerWithNullContent() throws 
JsonProcessingException {
+        String content = "{}";
+        String template = "Transformers test:data num is ${data-num}";
+        Transformer transformer = 
TransformerBuilder.buildTemplateTransFormer(content, template);
+        String output = transformer.transform(EVENT);
+        Assertions.assertEquals("Transformers test:data num is ${data-num}", 
output);
+    }
+
+    @Test
+    public void testTemplateTransFormerWithNoMatchContent() throws 
JsonProcessingException {
+        String extractJson = "{\"data-num\":\"$.data.no\"}";
+        String template = "Transformers test:data num is ${data-num}";
+        Transformer transformer = 
TransformerBuilder.buildTemplateTransFormer(extractJson, template);
+        String output = transformer.transform(EVENT);
+        Assertions.assertEquals("Transformers test:data num is null", output);
+    }
+
+    @Test
+    public void testTemplateTransFormerWithMatchNumValue() throws 
JsonProcessingException {
+        String extractJson = "{\"data-num\":\"$.data.num\"}";
+        String template = "Transformers test:data num is ${data-num}";
+        Transformer transformer = 
TransformerBuilder.buildTemplateTransFormer(extractJson, template);
+        String output = transformer.transform(EVENT);
+        Assertions.assertEquals("Transformers test:data num is 100", output);
+    }
+
+    @Test
+    public void testTemplateTransFormerWithMatchNullValue() throws 
JsonProcessingException {
+        String content = "{\"data-null\":\"$.data.nullV\"}";
+        String template = "Transformers test:data null is ${data-null}";
+        Transformer transformer = 
TransformerBuilder.buildTemplateTransFormer(content, template);
+        String output = transformer.transform(EVENT);
+        Assertions.assertEquals("Transformers test:data null is null", output);
+    }
+
+    @Test
+    public void testTemplateTransFormerWithMatchBooleanValue() throws 
JsonProcessingException {
+        String extractJson = "{\"boolean\":\"$.data.boolean\"}";
+        String template = "Transformers test:data boolean is ${boolean}";
+        Transformer transformer = 
TransformerBuilder.buildTemplateTransFormer(extractJson, template);
+        String output = transformer.transform(EVENT);
+        Assertions.assertEquals("Transformers test:data boolean is true", 
output);
+    }
+
+    //
+    @Test
+    public void testTemplateTransFormerWithConstant() throws 
JsonProcessingException {
+        String extractJson = 
"{\"name\":\"$.data.name\",\"constant\":\"constant\"" + "}";
+        String template = "Transformers test:data name is ${name}, constant is 
${constant}";
+        Transformer transformer = 
TransformerBuilder.buildTemplateTransFormer(extractJson, template);
+        String output = transformer.transform(EVENT);
+        Assertions.assertEquals("Transformers test:data name is 
test-transformer, constant is constant",
+            output);
+    }
+
+}
diff --git a/tools/dependency-check/known-dependencies.txt 
b/tools/dependency-check/known-dependencies.txt
index 7dd456ae7..6022c3fbd 100644
--- a/tools/dependency-check/known-dependencies.txt
+++ b/tools/dependency-check/known-dependencies.txt
@@ -1,7 +1,9 @@
+accessors-smart-2.4.7.jar
 amqp-client-5.16.0.jar
 animal-sniffer-annotations-1.19.jar
 annotations-4.1.1.4.jar
 aopalliance-1.0.jar
+asm-9.1.jar
 asm-9.2.jar
 asm-analysis-9.2.jar
 asm-commons-9.2.jar
@@ -36,6 +38,7 @@ commons-digester-2.1.jar
 commons-io-2.11.0.jar
 commons-lang3-3.6.jar
 commons-logging-1.2.jar
+commons-net-3.9.0.jar
 commons-text-1.9.jar
 commons-validator-1.7.jar
 consul-api-1.4.5.jar
@@ -92,6 +95,8 @@ jjwt-jackson-0.11.1.jar
 jna-4.2.2.jar
 jodd-bean-5.1.6.jar
 jodd-core-5.1.6.jar
+json-path-2.7.0.jar
+json-smart-2.4.7.jar
 jsr305-3.0.2.jar
 kafka-clients-3.0.0.jar
 listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to