This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


The following commit(s) were added to refs/heads/runtimer by this push:
     new d30d9b8  Optimize fileObserver formate and Support module (#76)
d30d9b8 is described below

commit d30d9b8ff5bd8a1739e8b2380dcce7be688d9a31
Author: Artisan <[email protected]>
AuthorDate: Fri Apr 7 17:45:30 2023 +0800

    Optimize fileObserver formate and Support module (#76)
    
    Optimize fileObserver formate and Support module (#76)
---
 .../eventbridge/adapter/runtimer/Runtimer.java     |  3 +-
 .../adapter/runtimer/boot/EventBusListener.java    |  4 +-
 .../adapter/runtimer/boot/EventRuleTransfer.java   | 76 ++++++++++++++++------
 .../adapter/runtimer/boot/EventTargetPusher.java   | 18 +++--
 .../runtimer/boot/listener/CirculatorContext.java  | 60 ++++++++++++-----
 .../boot/listener/RocketMQEventSubscriber.java     | 48 ++++++++------
 .../runtimer/boot/transfer/TransformEngine.java    | 14 ++++
 .../runtimer/common/entity/TargetKeyValue.java     |  5 ++
 .../adapter/runtimer/config/RuntimerConfig.java    | 37 -----------
 .../runtimer/config/RuntimerConfigProps.java       | 53 +++++++++++++++
 .../service/TargetRunnerConfigOnFileObserver.java  |  5 +-
 .../src/main/resources/runtimer.properties         |  6 +-
 .../runtimer/src/main/resources/target-runner.json | 42 ++++++------
 supports/connect-cloudevent-transform/pom.xml      |  2 +-
 .../transform/eventbridge/CloudEventTransform.java |  8 ++-
 .../rocketmq/connect/CloudEventTransformTest.java  |  2 +-
 supports/connect-eventbridge-transform/pom.xml     |  2 +-
 .../eventbridge/EventBridgeTransform.java          |  8 ++-
 .../rocketmq/connect/EventBridgeTransformTest.java |  2 +-
 supports/connect-filter-transform/pom.xml          |  2 +-
 .../eventbridge/EventBridgeFilterTransform.java    |  8 ++-
 .../EventBridgeFilterTransformTest.java            |  2 +-
 22 files changed, 269 insertions(+), 138 deletions(-)

diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
index f871e08..399fba9 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
@@ -25,6 +25,7 @@ import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.Circulator
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.RocketMQEventSubscriber;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.service.AbstractTargetRunnerConfigObserver;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigOnFileObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ public class Runtimer {
 
     private CirculatorContext circulatorContext;
 
-    private AbstractTargetRunnerConfigObserver runnerConfigObserver;
+    private TargetRunnerConfigObserver runnerConfigObserver;
 
     public Runtimer(CirculatorContext circulatorContext) {
         this.circulatorContext = circulatorContext;
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
index d9227d0..7a34aa3 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
@@ -36,9 +36,9 @@ public class EventBusListener extends ServiceThread {
 
     private static final Logger logger = 
LoggerFactory.getLogger(EventBusListener.class);
 
-    private CirculatorContext circulatorContext;
+    private final CirculatorContext circulatorContext;
 
-    private EventSubscriber eventSubscriber;
+    private final EventSubscriber eventSubscriber;
 
     public EventBusListener(CirculatorContext circulatorContext, 
EventSubscriber eventSubscriber) {
         this.circulatorContext = circulatorContext;
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
index f9492f6..cbb3585 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
@@ -18,18 +18,24 @@
 package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
 
 import com.alibaba.fastjson.JSON;
+import com.google.common.collect.Lists;
 import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
 
 /**
  * receive event and transfer the rule to pusher
@@ -38,9 +44,7 @@ public class EventRuleTransfer extends ServiceThread {
 
     private static final Logger logger = 
LoggerFactory.getLogger(EventRuleTransfer.class);
 
-    private CirculatorContext circulatorContext;
-
-    private ExecutorService executorService = new ThreadPoolExecutor(20, 60, 
1000, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
+    private final CirculatorContext circulatorContext;
 
     public EventRuleTransfer(CirculatorContext circulatorContext) {
         this.circulatorContext = circulatorContext;
@@ -54,27 +58,57 @@ public class EventRuleTransfer extends ServiceThread {
     @Override
     public void run() {
         while (!stopped) {
-            // add CompletableFuture
             ConnectRecord eventRecord = circulatorContext.takeEventRecord();
             if (Objects.isNull(eventRecord)) {
                 logger.info("listen eventRecord is empty, continue by curTime 
- {}", System.currentTimeMillis());
                 this.waitForRunning(1000);
                 continue;
             }
-            executorService.submit(() -> {
-                // extension add sub
-                // rule - target
-                
circulatorContext.getTaskTransformMap().entrySet().forEach(entry -> {
-                    TransformEngine<ConnectRecord> transformEngine = 
entry.getValue();
-                    ConnectRecord transformRecord = 
transformEngine.doTransforms(eventRecord);
-                    if (Objects.isNull(transformRecord)) {
-                        return;
-                    }
-                    // a bean for maintain
-                    circulatorContext.offerTargetTaskQueue(transformRecord);
-                    logger.debug("offer target task queue succeed, targetMap - 
{}", JSON.toJSONString(transformRecord));
-                });
+            Map<String, TransformEngine<ConnectRecord>> latestTransformMap = 
circulatorContext.getTaskTransformMap();
+            if(MapUtils.isEmpty(latestTransformMap)){
+                logger.warn("latest transform engine is empty, continue by 
curTime - {}", System.currentTimeMillis());
+                this.waitForRunning(3000);
+                continue;
+            }
+            // the event channel take rocket mq topic name as default
+            String eventChannelKey = RuntimerConfigDefine.CONNECT_TOPICNAME;
+            String eventChannel = eventRecord.getExtension(eventChannelKey);
+            Set<TransformEngine<ConnectRecord>> adaptTransformSet = 
latestTransformMap.values().stream()
+                    .filter(engine -> 
eventChannel.equals(engine.getConnectConfig(eventChannelKey)))
+                    .collect(Collectors.toSet());
+            if(CollectionUtils.isEmpty(adaptTransformSet)){
+                    logger.warn("adapt specific topic ref transform engine is 
empty, eventChannelName- {}", eventChannel);
+                    this.waitForRunning(3000);
+                    continue;
+            }
+            List<ConnectRecord> afterTransformConnect = Lists.newArrayList();
+            List<CompletableFuture<Void>> completableFutures = 
Lists.newArrayList();
+            adaptTransformSet.forEach(transfer->{
+                CompletableFuture<Void> transformFuture = 
CompletableFuture.supplyAsync(()-> transfer.doTransforms(eventRecord))
+                        .exceptionally((exception) -> {
+                            logger.error("transfer do transform event record 
failed,stackTrace-", exception);
+                            return null;
+                        })
+                        .thenAccept(record-> {
+                            if(Objects.nonNull(record)){
+                                String runnerNameKey = 
RuntimerConfigDefine.RUNNER_NAME;
+                                String taskClassKey = 
RuntimerConfigDefine.TASK_CLASS;
+                                record.getExtensions().put(runnerNameKey, 
transfer.getConnectConfig(runnerNameKey));
+                                record.getExtensions().put(taskClassKey, 
transfer.getConnectConfig(taskClassKey));
+                                afterTransformConnect.add(record);
+                            }
+                        });
+                completableFutures.add(transformFuture);
             });
+
+            try{
+                CompletableFuture.allOf(completableFutures.toArray(new 
CompletableFuture[adaptTransformSet.size()])).get();
+                circulatorContext.offerTargetTaskQueue(afterTransformConnect);
+                logger.info("offer target task queues succeed, transforms - 
{}", JSON.toJSONString(afterTransformConnect));
+            }catch (Exception exception){
+                logger.error("transfer event record failed, stackTrace-", 
exception);
+            }
+
         }
     }
 }
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
index 9c17d2a..43e6b29 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutorService;
 
 /**
  * event target push to sink task
@@ -39,7 +40,7 @@ public class EventTargetPusher extends ServiceThread{
 
     private static final Logger logger = 
LoggerFactory.getLogger(EventTargetPusher.class);
 
-    private CirculatorContext circulatorContext;
+    private final CirculatorContext circulatorContext;
 
     public EventTargetPusher(CirculatorContext circulatorContext) {
         this.circulatorContext = circulatorContext;
@@ -58,11 +59,16 @@ public class EventTargetPusher extends ServiceThread{
                 logger.debug("start push content by pusher - {}", 
JSON.toJSONString(targetRecord));
             }
 
-            Map<String, SinkTask> latestTaskMap = 
circulatorContext.getPusherTaskMap();
-            String runnerName = 
targetRecord.getExtensions().getString(RuntimerConfigDefine.RUNNER_NAME);
-            SinkTask sinkTask = latestTaskMap.get(runnerName);
-            // add thread pool
-            sinkTask.put(Lists.newArrayList(targetRecord));
+            ExecutorService executorService = 
circulatorContext.getExecutorService(targetRecord.getExtensions().getString(RuntimerConfigDefine.TASK_CLASS));
+            executorService.execute(() -> {
+                try {
+                    String runnerName = 
targetRecord.getExtensions().getString(RuntimerConfigDefine.RUNNER_NAME);
+                    SinkTask sinkTask = 
circulatorContext.getPusherTaskMap().get(runnerName);;
+                    sinkTask.put(Lists.newArrayList(targetRecord));
+                }catch (Exception exception){
+                    logger.error(getServiceName() + " push target exception, 
record - " + targetRecord + " , stackTrace-", exception);
+                }
+            });
         }
     }
 
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
index 58e6b04..de371f7 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
@@ -17,9 +17,11 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher.PusherTaskContext;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
@@ -36,9 +38,7 @@ import org.springframework.stereotype.Component;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.*;
 
 /**
  * event circulator context for transfer and pusher
@@ -48,7 +48,7 @@ public class CirculatorContext implements 
TargetRunnerListener {
 
     private final static Logger logger = 
LoggerFactory.getLogger(LoggerName.EventBus_Listener);
 
-    private BlockingQueue<ConnectRecord> eventRecord = new 
LinkedBlockingQueue<>(50000);
+    private BlockingQueue<ConnectRecord> eventQueue = new 
LinkedBlockingQueue<>(50000);
 
     private BlockingQueue<ConnectRecord> targetQueue = new 
LinkedBlockingQueue<>(50000);
 
@@ -56,6 +56,8 @@ public class CirculatorContext implements 
TargetRunnerListener {
 
     private Map<String/*RunnerName*/, SinkTask> pusherTaskMap = new 
ConcurrentHashMap<>(20);
 
+    private Map<String/*PusherCLass*/, ExecutorService> pusherExecutorMap = 
new ConcurrentHashMap<>(10);
+
     private Plugin plugin;
 
     public CirculatorContext(Plugin plugin){
@@ -95,8 +97,8 @@ public class CirculatorContext implements 
TargetRunnerListener {
      *
      * @param connectRecords
      */
-   public boolean  offerEventRecords(List<ConnectRecord> connectRecords) {
-        return eventRecord.addAll(connectRecords);
+    public boolean offerEventRecords(List<ConnectRecord> connectRecords) {
+        return eventQueue.addAll(connectRecords);
     }
 
     /**
@@ -105,11 +107,11 @@ public class CirculatorContext implements 
TargetRunnerListener {
      * @return
      */
     public ConnectRecord takeEventRecord() {
-        if (eventRecord.isEmpty()) {
+        if (eventQueue.isEmpty()) {
             return null;
         }
         try {
-            return eventRecord.take();
+            return eventQueue.take();
         } catch (Exception exception) {
             logger.error("take event record exception - stack-> ", exception);
         }
@@ -124,8 +126,8 @@ public class CirculatorContext implements 
TargetRunnerListener {
         return pusherTaskMap;
     }
 
-    public boolean offerTargetTaskQueue(ConnectRecord connectRecord) {
-        return targetQueue.offer(connectRecord);
+    public boolean offerTargetTaskQueue(List<ConnectRecord> connectRecords) {
+        return targetQueue.addAll(connectRecords);
     }
 
     public ConnectRecord takeTargetMap() {
@@ -140,6 +142,15 @@ public class CirculatorContext implements 
TargetRunnerListener {
         return null;
     }
 
+    /**
+     * get specific thread pool by push name
+     * @param pushName
+     * @return
+     */
+    public ExecutorService getExecutorService(String pushName){
+        return pusherExecutorMap.get(pushName);
+    }
+
     /**
      * refresh target runner where config changed
      * @param targetRunnerConfig
@@ -147,17 +158,22 @@ public class CirculatorContext implements 
TargetRunnerListener {
      */
     private void refreshRunnerMetadata(TargetRunnerConfig targetRunnerConfig, 
RefreshTypeEnum refreshTypeEnum) {
         String runnerName = targetRunnerConfig.getName();
-        switch (refreshTypeEnum){
+        switch (refreshTypeEnum) {
             case ADD:
             case UPDATE:
-                for(Map<String, String> configMap : 
targetRunnerConfig.getComponents()){
-                    TargetKeyValue targetKeyValue = new 
TargetKeyValue(configMap);
-                    TransformEngine<ConnectRecord> transformChain = new 
TransformEngine<>(targetKeyValue, plugin);
-                    taskTransformMap.put(runnerName, transformChain);
+                TargetKeyValue targetKeyValue = new TargetKeyValue();
+                
targetRunnerConfig.getComponents().forEach(targetKeyValue::putAll);
+                TransformEngine<ConnectRecord> transformChain = new 
TransformEngine<>(targetKeyValue, plugin);
+                taskTransformMap.put(runnerName, transformChain);
+
+                SinkTask sinkTask = initTargetSinkTask(targetKeyValue);
+                pusherTaskMap.put(runnerName, sinkTask);
 
-                    SinkTask sinkTask = initTargetSinkTask(targetKeyValue);
-                    pusherTaskMap.put(runnerName, sinkTask);
+                String pusherClass = 
targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
+                if (StringUtils.isNotEmpty(pusherClass) && 
!pusherExecutorMap.containsKey(pusherClass)) {
+                    pusherExecutorMap.put(pusherClass, 
initDefaultThreadPoolExecutor(pusherClass));
                 }
+
                 break;
             case DELETE:
                 taskTransformMap.remove(runnerName);
@@ -168,6 +184,16 @@ public class CirculatorContext implements 
TargetRunnerListener {
         }
     }
 
+    /**
+     * init default thread poll param, support auto config
+     * @param threadPollName
+     * @return
+     */
+    private ExecutorService initDefaultThreadPoolExecutor(String 
threadPollName) {
+        ThreadFactoryBuilder threadFactory = new 
ThreadFactoryBuilder().setNameFormat(threadPollName);
+        return new ThreadPoolExecutor(200, 300, 1, TimeUnit.SECONDS, new 
LinkedBlockingQueue<>(300), threadFactory.build());
+    }
+
     /**
      * init target sink task
      * @param targetKeyValue
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
index 4ca0507..1e504c1 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
@@ -27,6 +27,7 @@ import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
 import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
@@ -43,11 +44,10 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.io.support.PropertiesLoaderUtils;
-import org.springframework.util.CollectionUtils;
 
 import java.nio.charset.StandardCharsets;
 import java.util.*;
-import java.util.stream.Collectors;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * RocketMQ implement event subscriber
@@ -58,12 +58,14 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
 
     private DefaultLitePullConsumer pullConsumer;
 
-    private TargetRunnerConfigObserver runnerConfigObserver;
+    private final TargetRunnerConfigObserver runnerConfigObserver;
 
     private Integer pullTimeOut;
 
     private String namesrvAddr;
 
+    private Integer pullBatchSize;
+
     private static final String SEMICOLON = ";";
 
     private static final String SYS_DEFAULT_GROUP = 
"event-bridge-default-group";
@@ -100,25 +102,31 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
 
     @Override
     public List<ConnectRecord> pull() {
-        List<MessageExt> messageExts = pullConsumer.poll(pullTimeOut);
-        if (CollectionUtils.isEmpty(messageExts)) {
+        List<MessageExt> messages = pullConsumer.poll(pullTimeOut);
+        if (CollectionUtils.isEmpty(messages)) {
             logger.info("consumer poll message empty , consumer - {}", 
JSON.toJSONString(pullConsumer));
             return null;
         }
         List<ConnectRecord> connectRecords = Lists.newArrayList();
-        for (MessageExt messageExt : messageExts) {
-            ConnectRecord eventRecord = convertToSinkRecord(messageExt);
-            connectRecords.add(eventRecord);
-            if(logger.isDebugEnabled()){
-                logger.debug("offer listen event record -  {} - by message 
event- {}", eventRecord, messageExt);
-            }
-        }
+        List<CompletableFuture<Void>> completableFutures = 
Lists.newArrayList();
+        messages.forEach(item->{
+            CompletableFuture<Void> recordCompletableFuture = 
CompletableFuture.supplyAsync(()-> convertToSinkRecord(item))
+                    .exceptionally((exception) -> {
+                        logger.error("execute completable job 
failed,stackTrace-", exception);
+                        return null;
+                    })
+                    .thenAccept(connectRecords::add);
+            completableFutures.add(recordCompletableFuture);
+        });
+
+        CompletableFuture.allOf(completableFutures.toArray(new 
CompletableFuture[messages.size()])).join();
+
         return connectRecords;
     }
 
     @Override
     public void commit(List<ConnectRecord> connectRecordList) {
-
+        // TODO
     }
 
     /**
@@ -127,17 +135,18 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
      * @return
      */
     public Set<String> parseTopicsByRunnerConfigs(Set<TargetRunnerConfig> 
targetRunnerConfigs){
-        
if(org.apache.commons.collections.CollectionUtils.isEmpty(targetRunnerConfigs)){
+        if(CollectionUtils.isEmpty(targetRunnerConfigs)){
             logger.warn("target runner config is empty, parse to topic 
failed!");
             return null;
         }
         Set<String> listenTopics = Sets.newHashSet();
         for(TargetRunnerConfig runnerConfig : targetRunnerConfigs){
             List<Map<String,String>> runnerConfigMap = 
runnerConfig.getComponents();
-            
if(org.apache.commons.collections.CollectionUtils.isEmpty(runnerConfigMap)){
+            if(CollectionUtils.isEmpty(runnerConfigMap)){
+                logger.warn("target runner config components is empty, config 
info - {}", runnerConfig);
                 continue;
             }
-            
listenTopics.addAll(runnerConfigMap.stream().map(item->item.get(RuntimerConfigDefine.CONNECT_TOPICNAME)).collect(Collectors.toSet()));
+            
listenTopics.add(runnerConfigMap.iterator().next().get(RuntimerConfigDefine.CONNECT_TOPICNAME));
         }
         return listenTopics;
     }
@@ -150,10 +159,10 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
             Properties properties = 
PropertiesLoaderUtils.loadAllProperties("runtimer.properties");
             namesrvAddr = properties.getProperty("rocketmq.namesrvAddr");
             pullTimeOut = 
Integer.valueOf(properties.getProperty("rocketmq.consumer.pullTimeOut"));
+            pullBatchSize = 
Integer.valueOf(properties.getProperty("rocketmq.consumer.pullBatchSize"));
         }catch (Exception exception){
-
+            logger.error("init rocket mq property exception, stack trace-", 
exception);
         }
-
     }
 
     /**
@@ -173,6 +182,7 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
         DefaultLitePullConsumer consumer = new DefaultLitePullConsumer();
         consumer.setConsumerGroup(createGroupName(SYS_DEFAULT_GROUP));
         consumer.setNamesrvAddr(namesrvAddr);
+        consumer.setPullBatchSize(pullBatchSize);
         try {
             for(String topic : topics){
                 consumer.subscribe(topic, "*");
@@ -214,7 +224,7 @@ public class RocketMQEventSubscriber extends 
EventSubscriber {
      */
     private MessageQueue parseMessageQueueList(String messageQueueStr) {
         List<String> messageQueueStrList = 
Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr);
-        if 
(org.apache.commons.collections.CollectionUtils.isEmpty(messageQueueStrList) || 
messageQueueStrList.size() != 3) {
+        if (CollectionUtils.isEmpty(messageQueueStrList) || 
messageQueueStrList.size() != 3) {
             return null;
         }
         return new MessageQueue(messageQueueStrList.get(0), 
messageQueueStrList.get(1), Integer.valueOf(messageQueueStrList.get(2)));
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
index 9b59487..046d50d 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
@@ -91,6 +91,11 @@ public class TransformEngine<R extends ConnectRecord> 
implements AutoCloseable {
         });
     }
 
+    /**
+     * transform event record for target record
+     * @param connectRecord
+     * @return
+     */
     public R doTransforms(R connectRecord) {
         if (transformList.size() == 0) {
             return connectRecord;
@@ -105,6 +110,15 @@ public class TransformEngine<R extends ConnectRecord> 
implements AutoCloseable {
         return connectRecord;
     }
 
+    /**
+     * get task config value by key
+     * @param configKey
+     * @return
+     */
+    public String getConnectConfig(String configKey){
+        return config.getString(configKey);
+    }
+
     private Transform getTransform(String transformClass) throws Exception {
         ClassLoader loader = plugin.getPluginClassLoader(transformClass);
         final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java
index 5d52128..956d48e 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java
@@ -139,6 +139,11 @@ public class TargetKeyValue implements KeyValue, 
Serializable {
         this.properties = properties;
     }
 
+    public KeyValue putAll(Map<String,String> configProps){
+        this.properties.putAll(configProps);
+        return this;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfig.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfig.java
deleted file mode 100644
index af4cafa..0000000
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfig.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.eventbridge.adapter.runtimer.config;
-
-import lombok.Data;
-import org.springframework.beans.factory.annotation.Configurable;
-import org.springframework.beans.factory.annotation.Value;
-
-@Data
-@Configurable
-public class RuntimerConfig {
-
-    @Value("${rumtimer.name:}")
-    private String runtimeName;
-
-    @Value("${runtimer.pluginpath:}")
-    private String pluginPath;
-
-    @Value("${runtimer.storePathRootDir:}")
-    private String storePathRootDir;
-
-}
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigProps.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigProps.java
new file mode 100644
index 0000000..4f7827c
--- /dev/null
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigProps.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.support.PropertiesLoaderUtils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * runtimer properties factory
+ */
+public class RuntimerConfigProps {
+
+    private final static Logger logger = 
LoggerFactory.getLogger(RuntimerConfigProps.class);
+
+    private Properties properties;
+
+    private RuntimerConfigProps(){
+        try {
+             properties = 
PropertiesLoaderUtils.loadAllProperties("runtimer.properties");
+        } catch (IOException exception) {
+            logger.error("runtime load properties failed, stackTrace-", 
exception);
+        }
+    }
+
+    private static class  RuntimerConfigPropsHolder{
+        private static final RuntimerConfigProps instance = new 
RuntimerConfigProps();
+    }
+
+    public static RuntimerConfigProps build(){
+        return RuntimerConfigPropsHolder.instance;
+    }
+
+
+}
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
index 8764694..63f1e9d 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
@@ -50,7 +50,6 @@ public class TargetRunnerConfigOnFileObserver extends 
AbstractTargetRunnerConfig
 
     public static final String DEFAULT_TARGET_RUNNER_CONFIG_FILE_NAME = 
"target-runner.json";
 
-
     private static ScheduledExecutorService service = 
Executors.newSingleThreadScheduledExecutor(
         ThreadUtils.newThreadFactory("TargetRunnerConfigOnFileObserver", 
false));
 
@@ -61,9 +60,7 @@ public class TargetRunnerConfigOnFileObserver extends 
AbstractTargetRunnerConfig
     }
 
     public TargetRunnerConfigOnFileObserver() {
-        if(StringUtils.isEmpty(pathName)){
-            this.pathName = getConfigFilePath();
-        }
+        this.pathName = getConfigFilePath();
         super.getTargetRunnerConfig().addAll(getLatestTargetRunnerConfig());
         this.addListen(pathName, this);
     }
diff --git a/adapter/runtimer/src/main/resources/runtimer.properties 
b/adapter/runtimer/src/main/resources/runtimer.properties
index b5659c1..7b290e5 100644
--- a/adapter/runtimer/src/main/resources/runtimer.properties
+++ b/adapter/runtimer/src/main/resources/runtimer.properties
@@ -16,8 +16,12 @@
 ## rocketmq
 rocketmq.namesrvAddr=localhost:9876
 rocketmq.consumer.pullTimeOut = 3000
+rocketmq.consumer.pullBatchSize=20
 rocketmq.cluster.name=DefaultCluster
 ## runtimer
 rumtimer.name=eventbridge-runtimer
 runtimer.pluginpath=/Users/Local/eventbridge/plugin
-runtimer.storePathRootDir=/Users/Local/eventbridge/store
\ No newline at end of file
+runtimer.storePathRootDir=/Users/Local/eventbridge/store
+## listener
+listener.eventQueue.threshold=50000
+listener.targetQueue.threshold=50000
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/resources/target-runner.json 
b/adapter/runtimer/src/main/resources/target-runner.json
index 11b5088..e94e506 100644
--- a/adapter/runtimer/src/main/resources/target-runner.json
+++ b/adapter/runtimer/src/main/resources/target-runner.json
@@ -1,23 +1,23 @@
 [
-    {
-
-        "name":"xxxxx",
-        "components":[ {
-          "class" : "org.apache.rocketmq.connect.FileStream",
-          "path" : "xxxxxxxx",
-          "name" : "demo"
-        },
-        {
-          "class" : "org.apache.rocketmq.connect.transforms.PatternRename",
-          "pattern" : "company",
-          "replacement": "company02"
-        },
-       {
-          "class" : "org.apache.rocketmq.connect.HttpSinkTask",
-          "url" : "http://xxxxx/demo";
-        } ],
-        "runOptions":{
-            "taskSize":1
-        }
-    }
+  {
+    "name":"demo-runner",
+    "components":[
+      {
+        "runner-name": "demo-runner",
+        "connect-topicname":"eventbridge%654321%demo-bus_1678348282165"
+      },
+      {
+        "transforms":"filter,transform,",
+        "transforms-filter-filterPattern":"{}",
+        
"transforms-filter-class":"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeFilterTransform",
+        
"transforms-transform-data":"{\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\",\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\"}",
+        "transforms-transform-class": 
"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform"
+      },
+      {
+        
"task-class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkTask",
+        
"webHook":"https://oapi.dingtalk.com/robot/send?access_token=7f78aa4734ea9bd245984e47b6764ccb950b4292e4f6f9424dff92909f485f16";,
+        
"secretKey":"SEC8a898c9df7b6415090a8f1341d9eed000c815a89f301f2de87302a1e802dbd69"
+      }
+    ]
+  }
 ]
\ No newline at end of file
diff --git a/supports/connect-cloudevent-transform/pom.xml 
b/supports/connect-cloudevent-transform/pom.xml
index 9e47a7f..88caf65 100644
--- a/supports/connect-cloudevent-transform/pom.xml
+++ b/supports/connect-cloudevent-transform/pom.xml
@@ -21,7 +21,7 @@
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
-        
<openmessaging-connector.version>0.1.4</openmessaging-connector.version>
+        
<openmessaging-connector.version>0.1.2</openmessaging-connector.version>
         <cloudevents.version>2.3.0</cloudevents.version>
     </properties>
 
diff --git 
a/supports/connect-cloudevent-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/CloudEventTransform.java
 
b/supports/connect-cloudevent-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/CloudEventTransform.java
index 0e43b2a..6569478 100644
--- 
a/supports/connect-cloudevent-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/CloudEventTransform.java
+++ 
b/supports/connect-cloudevent-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/CloudEventTransform.java
@@ -21,6 +21,7 @@ import com.google.gson.Gson;
 import io.cloudevents.SpecVersion;
 import io.cloudevents.core.v1.CloudEventV1;
 import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.ComponentContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import org.apache.rocketmq.eventbridge.tools.transform.*;
 
@@ -167,7 +168,7 @@ public class CloudEventTransform implements 
io.openmessaging.connector.api.compo
     }
 
     @Override
-    public void start(KeyValue config) {
+    public void init(KeyValue config) {
         this.idTransform = buildTransform(config, CloudEventV1.ID);
         this.sourceTransform = buildTransform(config, CloudEventV1.SOURCE);
         this.specversionTransform = buildTransform(config, 
CloudEventV1.SPECVERSION);
@@ -177,6 +178,11 @@ public class CloudEventTransform implements 
io.openmessaging.connector.api.compo
         this.subjectTransform = buildTransform(config, CloudEventV1.SUBJECT);
     }
 
+    @Override
+    public void start(ComponentContext componentContext) {
+
+    }
+
     private Transform buildTransform(KeyValue config, String key) {
         String transformConfig = config.getString(key);
         if (transformConfig == null || Strings.isNullOrEmpty(transformConfig)) 
{
diff --git 
a/supports/connect-cloudevent-transform/src/test/java/org/apache/rocketmq/connect/CloudEventTransformTest.java
 
b/supports/connect-cloudevent-transform/src/test/java/org/apache/rocketmq/connect/CloudEventTransformTest.java
index 261604e..a9ed3c4 100644
--- 
a/supports/connect-cloudevent-transform/src/test/java/org/apache/rocketmq/connect/CloudEventTransformTest.java
+++ 
b/supports/connect-cloudevent-transform/src/test/java/org/apache/rocketmq/connect/CloudEventTransformTest.java
@@ -37,7 +37,7 @@ public class CloudEventTransformTest {
         keyValue.put("subject", 
"{\"value\":\"$.data.subject\",\"form\":\"JSONPATH\"}");
         keyValue.put("type", "{\"value\":\"$.type\",\"form\":\"JSONPATH\"}");
         CloudEventTransform cloudEventTransform = new CloudEventTransform();
-        cloudEventTransform.start(keyValue);
+        cloudEventTransform.init(keyValue);
 
         ConnectRecord record = new ConnectRecord(null, null, 
System.currentTimeMillis());
         record.addExtension("type", "type");
diff --git a/supports/connect-eventbridge-transform/pom.xml 
b/supports/connect-eventbridge-transform/pom.xml
index aa72f4d..02a5a8d 100644
--- a/supports/connect-eventbridge-transform/pom.xml
+++ b/supports/connect-eventbridge-transform/pom.xml
@@ -21,7 +21,7 @@
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
-        
<openmessaging-connector.version>0.1.4</openmessaging-connector.version>
+        
<openmessaging-connector.version>0.1.2</openmessaging-connector.version>
         <gson.version>2.8.9</gson.version>
     </properties>
 
diff --git 
a/supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeTransform.java
 
b/supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeTransform.java
index bc98d4b..3b14021 100644
--- 
a/supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeTransform.java
+++ 
b/supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeTransform.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.connect.transform.eventbridge;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
 import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.ComponentContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.SchemaBuilder;
 import org.apache.rocketmq.eventbridge.tools.transform.*;
@@ -54,7 +55,7 @@ public class EventBridgeTransform implements 
io.openmessaging.connector.api.comp
     }
 
     @Override
-    public void start(KeyValue config) {
+    public void init(KeyValue config) {
         config.keySet()
                 .forEach(key -> {
                     TransformParam transformParam = new 
Gson().fromJson(config.getString(key), TransformParam.class);
@@ -62,6 +63,11 @@ public class EventBridgeTransform implements 
io.openmessaging.connector.api.comp
                 });
     }
 
+    @Override
+    public void start(ComponentContext componentContext) {
+
+    }
+
     @Override
     public void stop() {
 
diff --git 
a/supports/connect-eventbridge-transform/src/test/java/org/apache/rocketmq/connect/EventBridgeTransformTest.java
 
b/supports/connect-eventbridge-transform/src/test/java/org/apache/rocketmq/connect/EventBridgeTransformTest.java
index 31c2e96..8395709 100644
--- 
a/supports/connect-eventbridge-transform/src/test/java/org/apache/rocketmq/connect/EventBridgeTransformTest.java
+++ 
b/supports/connect-eventbridge-transform/src/test/java/org/apache/rocketmq/connect/EventBridgeTransformTest.java
@@ -38,7 +38,7 @@ public class EventBridgeTransformTest {
             
"{\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\","
                 + 
"\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\"}");
         EventBridgeTransform eventBridgeTransform = new EventBridgeTransform();
-        eventBridgeTransform.start(keyValue);
+        eventBridgeTransform.init(keyValue);
 
         ConnectRecord record = new ConnectRecord(null, null, 
System.currentTimeMillis());
         record.addExtension("type", "type");
diff --git a/supports/connect-filter-transform/pom.xml 
b/supports/connect-filter-transform/pom.xml
index 35e0769..c9328d2 100644
--- a/supports/connect-filter-transform/pom.xml
+++ b/supports/connect-filter-transform/pom.xml
@@ -21,7 +21,7 @@
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
-        
<openmessaging-connector.version>0.1.4</openmessaging-connector.version>
+        
<openmessaging-connector.version>0.1.2</openmessaging-connector.version>
         <apache.commons-text.version>1.10.0</apache.commons-text.version>
         <gson.version>2.8.9</gson.version>
         <cloudevents.version>2.3.0</cloudevents.version>
diff --git 
a/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java
 
b/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java
index b34ca98..9f704b3 100644
--- 
a/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java
+++ 
b/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java
@@ -21,6 +21,7 @@ import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import io.cloudevents.SpecVersion;
 import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.ComponentContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import org.apache.rocketmq.eventbridge.tools.pattern.PatternEvaluator;
 import org.apache.rocketmq.eventbridge.tools.pattern.PatternEvaluatorBuilder;
@@ -70,7 +71,12 @@ public class EventBridgeFilterTransform implements 
io.openmessaging.connector.ap
     }
 
     @Override
-    public void start(KeyValue config) {
+    public void start(ComponentContext componentContext) {
+
+    }
+
+    @Override
+    public void init(KeyValue config) {
         this.evaluator = 
PatternEvaluatorBuilder.build(config.getString("filterPattern"));
     }
 
diff --git 
a/supports/connect-filter-transform/src/test/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransformTest.java
 
b/supports/connect-filter-transform/src/test/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransformTest.java
index 7698cf8..40cca1d 100644
--- 
a/supports/connect-filter-transform/src/test/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransformTest.java
+++ 
b/supports/connect-filter-transform/src/test/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransformTest.java
@@ -30,7 +30,7 @@ public class EventBridgeFilterTransformTest {
         EventBridgeFilterTransform transform = new 
EventBridgeFilterTransform();
         KeyValue keyValue = new DefaultKeyValue();
         keyValue.put("filterPattern", "{\"source\":[\"acs.mns\"]}");
-        transform.start(keyValue);
+        transform.init(keyValue);
 
         ConnectRecord record = new ConnectRecord(null, null, 
System.currentTimeMillis());
         record.addExtension("source", "acs.demo");


Reply via email to