This is an automated email from the ASF dual-hosted git repository.

chenguangsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c59c7f9c [ISSUE #4621] Implement TransformerEngine for EventMesh 
Transformer (#4622)
8c59c7f9c is described below

commit 8c59c7f9c4dbbf6943dc85a31f4db881f2282bd9
Author: mike_xwm <[email protected]>
AuthorDate: Thu Dec 7 21:32:00 2023 +0800

    [ISSUE #4621] Implement TransformerEngine for EventMesh Transformer (#4622)
    
    * [ISSUE #4621] Implement TransformerEngine for EventMesh Transformer
    
    * fix ci check error
---
 .../runtime/boot/EventMeshHTTPServer.java          |  10 ++
 .../eventmesh/runtime/boot/TransformerEngine.java  | 140 +++++++++++++++++++++
 .../http/processor/SendAsyncEventProcessor.java    |  10 ++
 .../protocol/http/push/AsyncHTTPPushRequest.java   |  16 +++
 .../eventmesh/transformer/JsonPathParser.java      |   4 +-
 .../org/apache/eventmesh/transformer/Template.java |   4 +-
 .../eventmesh/transformer/TemplateTransformer.java |   3 +-
 .../eventmesh/transformer/TransformerBuilder.java  |  43 ++-----
 .../{Template.java => TransformerParam.java}       |  41 +++---
 .../eventmesh/transformer/TransformerType.java     |  47 ++++++-
 .../org/apache/eventmesh/transformer/Variable.java |  14 +--
 .../eventmesh/transformer/TransformTest.java       |  13 +-
 12 files changed, 277 insertions(+), 68 deletions(-)

diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
index dc51f084a..cfe1ff14d 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/EventMeshHTTPServer.java
@@ -92,6 +92,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {
 
     private FilterEngine filterEngine;
 
+    private TransformerEngine transformerEngine;
+
     private HttpRetryer httpRetryer;
 
     private transient RateLimiter msgRateLimiter;
@@ -137,6 +139,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer 
{
 
         filterEngine = new FilterEngine(metaStorage, producerManager, 
consumerManager);
 
+        transformerEngine = new TransformerEngine(metaStorage, 
producerManager, consumerManager);
+
         super.setHandlerService(new HandlerService());
         super.getHandlerService().setMetrics(this.getMetrics());
 
@@ -180,6 +184,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer 
{
 
         filterEngine.shutdown();
 
+        transformerEngine.shutdown();
+
         consumerManager.shutdown();
 
         httpClientPool.shutdown();
@@ -354,6 +360,10 @@ public class EventMeshHTTPServer extends 
AbstractHTTPServer {
         return filterEngine;
     }
 
+    public TransformerEngine getTransformerEngine() {
+        return transformerEngine;
+    }
+
     public MetaStorage getMetaStorage() {
         return metaStorage;
     }
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TransformerEngine.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TransformerEngine.java
new file mode 100644
index 000000000..245ddf488
--- /dev/null
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/boot/TransformerEngine.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.runtime.boot;
+
+import org.apache.eventmesh.api.meta.MetaServiceListener;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.common.utils.LogUtils;
+import 
org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager;
+import 
org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager;
+import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
+import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
+import org.apache.eventmesh.runtime.meta.MetaStorage;
+import org.apache.eventmesh.transformer.Transformer;
+import org.apache.eventmesh.transformer.TransformerBuilder;
+import org.apache.eventmesh.transformer.TransformerParam;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class TransformerEngine {
+
+    /**
+     * key:group-topic
+     **/
+    private final Map<String, Transformer> transformerMap = new HashMap<>();
+
+    private final String transformerPrefix = "transformer-";
+
+    private final MetaStorage metaStorage;
+
+    private MetaServiceListener metaServiceListener;
+
+    private final ProducerManager producerManager;
+
+    private final ConsumerManager consumerManager;
+
+    private final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+
+    public TransformerEngine(MetaStorage metaStorage, ProducerManager 
producerManager, ConsumerManager consumerManager) {
+        this.metaStorage = metaStorage;
+        this.producerManager = producerManager;
+        this.consumerManager = consumerManager;
+    }
+
+    public void start() {
+        Map<String, String> transformerMetaData = 
metaStorage.getMetaData(transformerPrefix, true);
+        for (Entry<String, String> transformerDataEntry : 
transformerMetaData.entrySet()) {
+            // transformer-group
+            String key = transformerDataEntry.getKey();
+            // topic-transformerParam list
+            String value = transformerDataEntry.getValue();
+            updateTransformerMap(key, value);
+        }
+        metaServiceListener = this::updateTransformerMap;
+
+        // addListeners for producerManager & consumerManager
+        scheduledExecutorService.scheduleAtFixedRate(() -> {
+            ConcurrentHashMap<String, EventMeshProducer> producerMap = 
producerManager.getProducerTable();
+            for (String producerGroup : producerMap.keySet()) {
+                for (String transformerKey : transformerMap.keySet()) {
+                    if (!StringUtils.contains(transformerKey, producerGroup)) {
+                        addTransformerListener(producerGroup);
+                        LogUtils.info(log, "addTransformerListener for 
producer group: " + producerGroup);
+                    }
+                }
+            }
+            ConcurrentHashMap<String, ConsumerGroupManager> consumerMap = 
consumerManager.getClientTable();
+            for (String consumerGroup : consumerMap.keySet()) {
+                for (String transformerKey : transformerMap.keySet()) {
+                    if (!StringUtils.contains(transformerKey, consumerGroup)) {
+                        addTransformerListener(consumerGroup);
+                        LogUtils.info(log, "addTransformerListener for 
consumer group: " + consumerGroup);
+                    }
+                }
+            }
+        }, 10_000, 5_000, TimeUnit.MILLISECONDS);
+    }
+
+    private void updateTransformerMap(String key, String value) {
+        String group = StringUtils.substringAfter(key, transformerPrefix);
+
+        JsonNode transformerJsonNodeArray = JsonUtils.getJsonNode(value);
+
+        if (transformerJsonNodeArray != null) {
+            for (JsonNode transformerJsonNode : transformerJsonNodeArray) {
+                String topic = transformerJsonNode.get("topic").asText();
+                String transformerParam = 
transformerJsonNode.get("transformerParam").toString();
+                TransformerParam tfp = JsonUtils.parseObject(transformerParam, 
TransformerParam.class);
+                Transformer transformer = 
TransformerBuilder.buildTransformer(tfp);
+                transformerMap.put(group + "-" + topic, transformer);
+            }
+        }
+        addTransformerListener(group);
+    }
+
+    public void addTransformerListener(String group) {
+        String transformerKey = transformerPrefix + group;
+        try {
+            metaStorage.getMetaDataWithListener(metaServiceListener, 
transformerKey);
+        } catch (Exception e) {
+            throw new RuntimeException("addTransformerListener exception", e);
+        }
+    }
+
+    public void shutdown() {
+        scheduledExecutorService.shutdown();
+    }
+
+    public Transformer getTransformer(String key) {
+        return transformerMap.get(key);
+    }
+
+}
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
index 52f740151..34e4ffcb3 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java
@@ -45,6 +45,7 @@ import 
org.apache.eventmesh.runtime.core.protocol.producer.SendMessageContext;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.RemotingHelper;
 import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
+import org.apache.eventmesh.transformer.Transformer;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -161,6 +162,7 @@ public class SendAsyncEventProcessor implements 
AsyncHttpProcessor {
         final String topic = event.getSubject();
 
         Pattern filterPattern = 
eventMeshHTTPServer.getFilterEngine().getFilterPattern(producerGroup + "-" + 
topic);
+        Transformer transformer = 
eventMeshHTTPServer.getTransformerEngine().getTransformer(producerGroup + "-" + 
topic);
 
         // validate body
         if (StringUtils.isAnyBlank(bizNo, uniqueId, producerGroup, topic)
@@ -252,6 +254,14 @@ public class SendAsyncEventProcessor implements 
AsyncHttpProcessor {
                 isFiltered = 
filterPattern.filter(JsonUtils.toJSONString(event));
             }
 
+            // apply transformer
+            if (isFiltered && transformer != null) {
+                String data = 
transformer.transform(JsonUtils.toJSONString(event));
+                event = 
CloudEventBuilder.from(event).withData(Objects.requireNonNull(JsonUtils.toJSONString(data))
+                    .getBytes(StandardCharsets.UTF_8)).build();
+                sendMessageContext.setEvent(event);
+            }
+
             if (isFiltered) {
                 eventMeshProducer.send(sendMessageContext, new SendCallback() {
 
diff --git 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
index bee14c992..e86cda550 100644
--- 
a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
+++ 
b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java
@@ -38,6 +38,7 @@ import 
org.apache.eventmesh.runtime.constants.EventMeshConstants;
 import 
org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext;
 import org.apache.eventmesh.runtime.util.EventMeshUtil;
 import org.apache.eventmesh.runtime.util.WebhookUtil;
+import org.apache.eventmesh.transformer.Transformer;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.DateFormatUtils;
@@ -53,6 +54,7 @@ import org.apache.http.util.EntityUtils;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -138,6 +140,20 @@ public class AsyncHTTPPushRequest extends 
AbstractHTTPPushRequest {
                 return;
             }
         }
+        Transformer transformer = eventMeshHTTPServer.getTransformerEngine()
+            .getTransformer(handleMsgContext.getConsumerGroup() + "-" + 
handleMsgContext.getTopic());
+        if (transformer != null) {
+            try {
+                String data = 
transformer.transform(JsonUtils.toJSONString(event));
+                event = 
CloudEventBuilder.from(event).withData(Objects.requireNonNull(JsonUtils.toJSONString(data))
+                    .getBytes(StandardCharsets.UTF_8)).build();
+            } catch (Exception exception) {
+                LOGGER.warn("apply transformer to cloudevents error, group:{}, 
topic:{}, bizSeqNo={}, uniqueId={}",
+                    this.handleMsgContext.getConsumerGroup(),
+                    this.handleMsgContext.getTopic(), 
this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId(), 
exception);
+                return;
+            }
+        }
         handleMsgContext.setEvent(event);
         super.setEvent(event);
 
diff --git 
a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java
 
b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java
index a220fc0ab..a0ebde12d 100644
--- 
a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java
+++ 
b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java
@@ -72,8 +72,8 @@ public class JsonPathParser {
 
         List<Variable> variableList = new ArrayList<>(variablesList.size());
         for (Variable element : variablesList) {
-            if (JsonPathUtils.isValidAndDefinite(element.getJsonPath())) {
-                String res = JsonPathUtils.matchJsonPathValueWithString(json, 
element.getJsonPath());
+            if (JsonPathUtils.isValidAndDefinite(element.getValue())) {
+                String res = JsonPathUtils.matchJsonPathValueWithString(json, 
element.getValue());
                 Variable variable = new Variable(element.getName(), res);
                 variableList.add(variable);
             } else {
diff --git 
a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java
 
b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java
index 5d2358dfe..19c3b5cec 100644
--- 
a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java
+++ 
b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java
@@ -34,8 +34,8 @@ public class Template {
     public String substitute(List<Variable> variables) throws 
TransformException {
 
         Map<String, String> valuesMap = variables.stream()
-            .filter(variable -> variable.getJsonPath() != null)
-            .collect(Collectors.toMap(Variable::getName, 
Variable::getJsonPath));
+            .filter(variable -> variable.getValue() != null)
+            .collect(Collectors.toMap(Variable::getName, Variable::getValue));
         StringSubstitutor sub = new StringSubstitutor(valuesMap);
 
         return sub.replace(template);
diff --git 
a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TemplateTransformer.java
 
b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TemplateTransformer.java
index 2a4db5fc3..bc9907ff4 100644
--- 
a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TemplateTransformer.java
+++ 
b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TemplateTransformer.java
@@ -37,8 +37,7 @@ class TemplateTransformer implements Transformer {
         // 1: get variable match results
         List<Variable> variableList = jsonPathParser.match(json);
         // 2: use results replace template
-        String res = template.substitute(variableList);
-        return res;
+        return template.substitute(variableList);
     }
 
 }
diff --git 
a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java
 
b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java
index 05ad58355..6e007b34b 100644
--- 
a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java
+++ 
b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java
@@ -19,41 +19,20 @@ package org.apache.eventmesh.transformer;
 
 public class TransformerBuilder {
 
-    public static final class Builder {
-
-        private final TransformerType transformerType;
-        private String template;
-        private String content;
-
-        public Builder(TransformerType transformerType) {
-            this.transformerType = transformerType;
-        }
-
-        public Builder setContent(String content) {
-            this.content = content;
-            return this;
-        }
-
-        public Builder setTemplate(String template) {
-            this.template = template;
-            return this;
-        }
-
-        public Transformer build() {
-            switch (this.transformerType) {
-                case CONSTANT:
-                    return buildConstantTransformer(this.content);
-                case ORIGINAL:
-                    return buildOriginalTransformer();
-                case TEMPLATE:
-                    return buildTemplateTransFormer(this.content, 
this.template);
-                default:
-                    throw new TransformException("invalid config");
-            }
+    public static Transformer buildTransformer(TransformerParam 
transformerParam) {
+        switch (transformerParam.getTransformerType()) {
+            case ORIGINAL:
+                return buildOriginalTransformer();
+            case CONSTANT:
+                return buildConstantTransformer(transformerParam.getValue());
+            case TEMPLATE:
+                return buildTemplateTransFormer(transformerParam.getValue(), 
transformerParam.getTemplate());
+            default:
+                throw new TransformException("invalid config");
         }
-
     }
 
+
     public static Transformer buildTemplateTransFormer(String jsonContent, 
String template) {
         JsonPathParser jsonPathParser = new JsonPathParser(jsonContent);
         Template templateEntry = new Template(template);
diff --git 
a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java
 
b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerParam.java
similarity index 58%
copy from 
eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java
copy to 
eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerParam.java
index 5d2358dfe..d747d7be4 100644
--- 
a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Template.java
+++ 
b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerParam.java
@@ -17,29 +17,39 @@
 
 package org.apache.eventmesh.transformer;
 
-import org.apache.commons.text.StringSubstitutor;
-
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-public class Template {
+public class TransformerParam {
 
+    private TransformerType transformerType;
+    private String value;
     private String template;
 
-    public Template(String template) {
+    public TransformerParam() {
+    }
+
+    public TransformerParam(TransformerType transformerType, String value, 
String template) {
+        this.transformerType = transformerType;
+        this.value = value;
         this.template = template;
     }
 
-    public String substitute(List<Variable> variables) throws 
TransformException {
+    public TransformerParam(TransformerType transformerType, String value) {
+        this(transformerType, value, null);
+    }
+
+    public TransformerType getTransformerType() {
+        return transformerType;
+    }
 
-        Map<String, String> valuesMap = variables.stream()
-            .filter(variable -> variable.getJsonPath() != null)
-            .collect(Collectors.toMap(Variable::getName, 
Variable::getJsonPath));
-        StringSubstitutor sub = new StringSubstitutor(valuesMap);
+    public void setTransformerType(TransformerType transformerType) {
+        this.transformerType = transformerType;
+    }
 
-        return sub.replace(template);
+    public String getValue() {
+        return value;
+    }
 
+    public void setValue(String value) {
+        this.value = value;
     }
 
     public String getTemplate() {
@@ -49,4 +59,5 @@ public class Template {
     public void setTemplate(String template) {
         this.template = template;
     }
-}
\ No newline at end of file
+
+}
diff --git 
a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java
 
b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java
index 5ca2142c3..8ad06e48b 100644
--- 
a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java
+++ 
b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java
@@ -17,8 +17,49 @@
 
 package org.apache.eventmesh.transformer;
 
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+
 public enum TransformerType {
-    ORIGINAL,
-    CONSTANT,
-    TEMPLATE
+    ORIGINAL(1, "original"),
+    CONSTANT(2, "constant"),
+    TEMPLATE(3, "template");
+
+    private int code;
+
+    private String type;
+
+    TransformerType(int code, String type) {
+        this.code = code;
+        this.type = type;
+    }
+
+    @JsonCreator
+    public static TransformerType getItem(String type) {
+        for (TransformerType transformerType : values()) {
+            if (Objects.equals(transformerType.getType(), type)) {
+                return transformerType;
+            }
+        }
+        return null;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    @JsonValue
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
 }
diff --git 
a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Variable.java
 
b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Variable.java
index 6b3cc4da4..c9259d335 100644
--- 
a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Variable.java
+++ 
b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/Variable.java
@@ -21,11 +21,11 @@ public class Variable {
 
     private String name;
 
-    private String jsonPath;
+    private String value;
 
-    public Variable(String name, String jsonPath) {
+    public Variable(String name, String value) {
         this.name = name;
-        this.jsonPath = jsonPath;
+        this.value = value;
     }
 
     public String getName() {
@@ -36,11 +36,11 @@ public class Variable {
         this.name = name;
     }
 
-    public String getJsonPath() {
-        return jsonPath;
+    public String getValue() {
+        return value;
     }
 
-    public void setJsonPath(String jsonPath) {
-        this.jsonPath = jsonPath;
+    public void setValue(String value) {
+        this.value = value;
     }
 }
diff --git 
a/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java
 
b/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java
index 2127e50ed..a55cde0ba 100644
--- 
a/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java
+++ 
b/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java
@@ -42,8 +42,10 @@ public class TransformTest {
 
     @Test
     public void testOriginalTransformer() throws JsonProcessingException {
+        TransformerParam transformerParam = new TransformerParam();
+        transformerParam.setTransformerType(TransformerType.ORIGINAL);
 
-        Transformer transformer = new 
TransformerBuilder.Builder(TransformerType.ORIGINAL).build();
+        Transformer transformer = 
TransformerBuilder.buildTransformer(transformerParam);
         String output = transformer.transform(EVENT);
         Assertions.assertEquals(EVENT, output);
 
@@ -54,7 +56,8 @@ public class TransformTest {
 
     @Test
     public void testConstantTransformer() throws JsonProcessingException {
-        Transformer transformer = new 
TransformerBuilder.Builder(TransformerType.CONSTANT).setContent("constant 
test").build();
+        TransformerParam transformerParam = new 
TransformerParam(TransformerType.CONSTANT, "constant test");
+        Transformer transformer = 
TransformerBuilder.buildTransformer(transformerParam);
         String output = transformer.transform(EVENT);
         Assertions.assertEquals("constant test", output);
 
@@ -72,9 +75,9 @@ public class TransformTest {
         String output = transform.transform(EVENT);
         Assertions.assertEquals("Transformers test:data name is 
test-transformer", output);
 
-        Transformer transformer1 = new 
TransformerBuilder.Builder(TransformerType.TEMPLATE)
-            .setContent(content)
-            .setTemplate(template).build();
+        TransformerParam transformerParam = new 
TransformerParam(TransformerType.TEMPLATE, content, template);
+
+        Transformer transformer1 = 
TransformerBuilder.buildTransformer(transformerParam);
         String output1 = transformer1.transform(EVENT);
         Assertions.assertEquals("Transformers test:data name is 
test-transformer", output1);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to