This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


The following commit(s) were added to refs/heads/runtimer by this push:
     new f22bec4  Update Runtimer Listener Offer Connect Record (#57)
f22bec4 is described below

commit f22bec4ac3bc6148d9e5c7b6b6b81db1f13bf615
Author: Artisan <[email protected]>
AuthorDate: Tue Mar 14 16:54:01 2023 +0800

    Update Runtimer Listener Offer Connect Record (#57)
    
    Update Runtimer Listener Offer Connect Record
---
 adapter/runtimer/README.md                         | 11 ++++
 .../eventbridge/adapter/runtimer/Runtimer.java     |  6 +-
 .../adapter/runtimer/boot/EventBusListener.java    | 44 +++++++++++++-
 .../adapter/runtimer/boot/EventRuleTransfer.java   | 68 ++--------------------
 .../adapter/runtimer/boot/EventTargetPusher.java   |  8 +--
 .../runtimer/boot/listener/ListenerFactory.java    | 33 ++++++++++-
 .../runtimer/boot/transfer/TransformEngine.java    |  6 +-
 ...ConfigDefine.java => RuntimerConfigDefine.java} |  4 +-
 .../service/PusherConfigManageServiceImpl.java     | 33 +++++------
 9 files changed, 112 insertions(+), 101 deletions(-)

diff --git a/adapter/runtimer/README.md b/adapter/runtimer/README.md
index bb66b9e..4e508ba 100644
--- a/adapter/runtimer/README.md
+++ b/adapter/runtimer/README.md
@@ -26,6 +26,17 @@ TODO:
 metrics, logger, exception
 detail
 
+3.10 Note
+本次会议纪要:
+1、runtimer配置从DB加载;
+2、任务配置的命名:targetRunner;
+3、更新任务配置:从Service修改成API+存储 Watch;
+4、EventBusListener: 不能出现RMQ&Message,只能围绕Connect Recorded;
+5、EventBusListener:支持位点提交;
+
+下次会议时间:3月14日,下周二 19:00
+
+
 
 
 
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
index 6c5095a..896885d 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
@@ -21,12 +21,10 @@ import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventBusListener;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventRuleTransfer;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventTargetPusher;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfig;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,8 +34,6 @@ import javax.annotation.PostConstruct;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
index dfeb3e5..0c40235 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
@@ -19,7 +19,14 @@ package 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
 
 import com.alibaba.fastjson.JSON;
 import com.google.common.collect.Lists;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -28,11 +35,13 @@ import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTarg
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.QueueState;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.CollectionUtils;
 
+import java.nio.charset.StandardCharsets;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -116,8 +125,9 @@ public class EventBusListener extends ServiceThread {
                             return;
                         }
                         for (MessageExt messageExt : messageExts) {
-                            listenerFactory.offerListenEvent(messageExt);
-                            logger.debug("consumer - {} - offer listen event - 
 {}", JSON.toJSONString(pullConsumer), JSON.toJSON(messageExt));
+                            ConnectRecord eventRecord = 
convertToSinkRecord(messageExt);
+                            listenerFactory.offerEventRecord(eventRecord);
+                            logger.debug("consumer - {} - offer listen event 
record -  {} - by - message event- {}", JSON.toJSONString(pullConsumer), 
eventRecord, messageExt);
                         }
                     } finally {
                         pullConsumer.commitSync();
@@ -132,6 +142,36 @@ public class EventBusListener extends ServiceThread {
         return this.getClass().getSimpleName();
     }
 
+    /**
+     * MessageExt convert to connect record
+     * @param messageExt
+     * @return
+     */
+    private ConnectRecord convertToSinkRecord(MessageExt messageExt) {
+        Map<String, String> properties = messageExt.getProperties();
+        Schema schema;
+        Long timestamp;
+        ConnectRecord sinkRecord;
+        String connectTimestamp = 
properties.get(RuntimerConfigDefine.CONNECT_TIMESTAMP);
+        timestamp = StringUtils.isNotEmpty(connectTimestamp) ? 
Long.valueOf(connectTimestamp) : null;
+        String connectSchema = 
properties.get(RuntimerConfigDefine.CONNECT_SCHEMA);
+        schema = StringUtils.isNotEmpty(connectSchema) ? 
JSON.parseObject(connectSchema, Schema.class) : null;
+        byte[] body = messageExt.getBody();
+        RecordPartition recordPartition = 
listenerFactory.convertToRecordPartition(messageExt.getTopic(), 
messageExt.getBrokerName(), messageExt.getQueueId());
+        RecordOffset recordOffset = 
listenerFactory.convertToRecordOffset(messageExt.getQueueOffset());
+        String bodyStr = new String(body, StandardCharsets.UTF_8);
+        sinkRecord = new ConnectRecord(recordPartition, recordOffset, 
timestamp, schema, bodyStr);
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAME, 
messageExt.getTopic());
+        if (MapUtils.isNotEmpty(properties)) {
+            for (Map.Entry<String, String> entry : properties.entrySet()) {
+                keyValue.put(entry.getKey(), entry.getValue());
+            }
+        }
+        sinkRecord.addExtension(keyValue);
+        return sinkRecord;
+    }
+
     /**
      * consumer update listener
      */
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
index 3be7b8a..f14fdec 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
@@ -18,27 +18,17 @@
 package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
 
 import com.alibaba.fastjson.JSON;
-import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.data.ConnectRecord;
-import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
-import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.message.MessageExt;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimeConfigDefine;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.charset.StandardCharsets;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -59,8 +49,6 @@ public class EventRuleTransfer extends ServiceThread {
 
     private ExecutorService executorService = new ThreadPoolExecutor(20,60, 
1000,TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
 
-    private ExecutorService singleExecutor = 
Executors.newSingleThreadScheduledExecutor();
-
     public EventRuleTransfer(Plugin plugin, ListenerFactory listenerFactory, 
PusherConfigManageService pusherConfigManageService){
         this.plugin = plugin;
         this.listenerFactory = listenerFactory;
@@ -72,18 +60,6 @@ public class EventRuleTransfer extends ServiceThread {
         this.taskTransformMap.putAll(initSinkTaskTransformInfo(taskConfig));
     }
 
-    private static final Set<String> MQ_SYS_KEYS = new HashSet<String>() {
-        {
-            add("MIN_OFFSET");
-            add("TRACE_ON");
-            add("MAX_OFFSET");
-            add("MSG_REGION");
-            add("UNIQ_KEY");
-            add("WAIT");
-            add("TAGS");
-        }
-    };
-
     @Override
     public String getServiceName() {
         return this.getClass().getSimpleName();
@@ -92,21 +68,20 @@ public class EventRuleTransfer extends ServiceThread {
     @Override
     public void run() {
         while (!stopped){
-            MessageExt messageExt = listenerFactory.takeListenerEvent();
-            if(Objects.isNull(messageExt)){
-                logger.info("listen message is empty, continue by curTime - 
{}", System.currentTimeMillis());
+            ConnectRecord eventRecord = listenerFactory.takeEventRecord();
+            if(Objects.isNull(eventRecord)){
+                logger.info("listen eventRecord is empty, continue by curTime 
- {}", System.currentTimeMillis());
                 this.waitForRunning(1000);
                 continue;
             }
             executorService.submit(() -> {
-                ConnectRecord connectRecord = 
convertToSinkDataEntry(messageExt);
                 // extension add sub
                 // rule - target
                 for (TargetKeyValue targetKeyValue : 
taskTransformMap.keySet()){
                     // add threadPool for cup task
                     // attention coreSize
                     TransformEngine<ConnectRecord> transformEngine = 
taskTransformMap.get(targetKeyValue);
-                    ConnectRecord transformRecord = 
transformEngine.doTransforms(connectRecord);
+                    ConnectRecord transformRecord = 
transformEngine.doTransforms(eventRecord);
                     if(Objects.isNull(transformRecord)){
                         continue;
                     }
@@ -147,41 +122,6 @@ public class EventRuleTransfer extends ServiceThread {
         return curTaskTransformMap;
     }
 
-    /**
-     * MessageExt convert to connect record
-     * @param message
-     * @return
-     */
-    private ConnectRecord convertToSinkDataEntry(MessageExt message) {
-        Map<String, String> properties = message.getProperties();
-        Schema schema;
-        Long timestamp;
-        ConnectRecord sinkDataEntry;
-        String connectTimestamp = 
properties.get(RuntimeConfigDefine.CONNECT_TIMESTAMP);
-        timestamp = StringUtils.isNotEmpty(connectTimestamp) ? 
Long.valueOf(connectTimestamp) : null;
-        String connectSchema = 
properties.get(RuntimeConfigDefine.CONNECT_SCHEMA);
-        schema = StringUtils.isNotEmpty(connectSchema) ? 
JSON.parseObject(connectSchema, Schema.class) : null;
-        byte[] body = message.getBody();
-        RecordPartition recordPartition = 
listenerFactory.convertToRecordPartition(message.getTopic(), 
message.getBrokerName(), message.getQueueId());
-        RecordOffset recordOffset = 
listenerFactory.convertToRecordOffset(message.getQueueOffset());
-        String bodyStr = new String(body, StandardCharsets.UTF_8);
-        sinkDataEntry = new ConnectRecord(recordPartition, recordOffset, 
timestamp, schema, bodyStr);
-        KeyValue keyValue = new DefaultKeyValue();
-        if (MapUtils.isNotEmpty(properties)) {
-            for (Map.Entry<String, String> entry : properties.entrySet()) {
-                if (MQ_SYS_KEYS.contains(entry.getKey())) {
-                    keyValue.put("MQ-SYS-" + entry.getKey(), entry.getValue());
-                } else if (entry.getKey().startsWith("connect-ext-")) {
-                    keyValue.put(entry.getKey().replaceAll("connect-ext-", 
""), entry.getValue());
-                } else {
-                    keyValue.put(entry.getKey(), entry.getValue());
-                }
-            }
-        }
-        sinkDataEntry.addExtension(keyValue);
-        return sinkDataEntry;
-    }
-
     /**
      * transform update listener
      */
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
index 105c4b6..d324fe4 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
 import io.netty.util.internal.ConcurrentSet;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
-import lombok.SneakyThrows;
 import org.apache.commons.collections.MapUtils;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher.PusherTaskContext;
@@ -31,14 +30,13 @@ import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyV
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimeConfigDefine;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
 
 /**
  * event target push to sink task
@@ -81,7 +79,7 @@ public class EventTargetPusher extends ServiceThread {
         }
         for(TargetKeyValue targetKeyValue : taskProperty){
             try{
-                String taskClass = 
targetKeyValue.getString(RuntimeConfigDefine.TASK_CLASS);
+                String taskClass = 
targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
                 ClassLoader loader = plugin.getPluginClassLoader(taskClass);
                 Class taskClazz;
                 boolean isolationFlag = false;
@@ -121,7 +119,7 @@ public class EventTargetPusher extends ServiceThread {
             // task-id for unique-key at ConnectKeyValue
             // ConnectKeyValue -> new class for name
             // also add in ConnectRecord class system property
-            String taskPushName = 
targetKeyValue.getString(RuntimeConfigDefine.TASK_CLASS);
+            String taskPushName = 
targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
             // add thread pool
             for(SinkTask sinkTask : pusherTasks){
                 if(sinkTask.getClass().getName().equals(taskPushName)){
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
index 33804b4..07f010e 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
@@ -31,7 +31,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimeConfigDefine;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
@@ -56,6 +56,8 @@ public class ListenerFactory {
 
     private BlockingQueue<MessageExt> eventMessage = new 
LinkedBlockingQueue(50000);
 
+    private BlockingQueue<ConnectRecord> eventRecord = new 
LinkedBlockingQueue<>(50000);
+
     private BlockingQueue<Map<TargetKeyValue, ConnectRecord>> targetQueue = 
new LinkedBlockingQueue<>(50000);
 
     @Value("${rocketmq.namesrvAddr:}")
@@ -107,6 +109,31 @@ public class ListenerFactory {
         return null;
     }
 
+    /**
+     * offer event record
+     * @param connectRecord
+     * @return
+     */
+    public boolean offerEventRecord(ConnectRecord connectRecord){
+        return eventRecord.offer(connectRecord);
+    }
+
+    /**
+     * take event record
+     * @return
+     */
+    public ConnectRecord takeEventRecord() {
+        if(eventRecord.isEmpty()){
+            return null;
+        }
+        try {
+            return eventRecord.take();
+        }catch (Exception exception){
+            logger.error("take event record exception - stack-> ", exception);
+        }
+        return null;
+    }
+
     public String createInstance(String servers) {
         String[] serversArray = servers.split(";");
         List<String> serversList = new ArrayList<String>();
@@ -125,7 +152,7 @@ public class ListenerFactory {
      * @return
      */
     public List<String> parseTopicList(TargetKeyValue taskConfig) {
-        String messageQueueStr = 
taskConfig.getString(RuntimeConfigDefine.CONNECT_TOPICNAME);
+        String messageQueueStr = 
taskConfig.getString(RuntimerConfigDefine.CONNECT_TOPICNAME);
         if (StringUtils.isBlank(messageQueueStr)) {
             return null;
         }
@@ -141,7 +168,7 @@ public class ListenerFactory {
     public List<String> parseTopicListByList(List<TargetKeyValue> taskConfigs) 
{
         Set<String> allTopicList = Sets.newHashSet();
         for(TargetKeyValue taskConfig : taskConfigs){
-            String messageQueueStr = 
taskConfig.getString(RuntimeConfigDefine.CONNECT_TOPICNAME);
+            String messageQueueStr = 
taskConfig.getString(RuntimerConfigDefine.CONNECT_TOPICNAME);
             if (StringUtils.isBlank(messageQueueStr)) {
                 continue;
             }
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
index 8c75702..9b59487 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
@@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimeConfigDefine;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +49,7 @@ public class TransformEngine<R extends ConnectRecord> 
implements AutoCloseable {
 
     private static final String COMMA = ",";
 
-    private static final String PREFIX = RuntimeConfigDefine.TRANSFORMS + "-";
+    private static final String PREFIX = RuntimerConfigDefine.TRANSFORMS + "-";
 
     public TransformEngine(KeyValue config, Plugin plugin) {
         this.config = config;
@@ -59,7 +59,7 @@ public class TransformEngine<R extends ConnectRecord> 
implements AutoCloseable {
     }
 
     private void init() {
-        String transformsStr = 
config.getString(RuntimeConfigDefine.TRANSFORMS);
+        String transformsStr = 
config.getString(RuntimerConfigDefine.TRANSFORMS);
         if (StringUtils.isBlank(transformsStr)) {
             log.warn("no transforms config, {}", JSON.toJSONString(config));
             return;
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimeConfigDefine.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
similarity index 97%
rename from 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimeConfigDefine.java
rename to 
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
index 3a208d2..ebf1c88 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimeConfigDefine.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
@@ -24,9 +24,9 @@ import java.util.HashSet;
 import java.util.Set;
 
 /**
- * Define keys for connector and task configs.
+ * Define keys for target runner configs.
  */
-public class RuntimeConfigDefine {
+public class RuntimerConfigDefine {
 
     /**
      * The full class name of a specific connector implements.
diff --git 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
index 1dde731..58a66fd 100644
--- 
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
+++ 
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
@@ -20,14 +20,13 @@ package 
org.apache.rocketmq.eventbridge.adapter.runtimer.service;
 import com.google.common.collect.Lists;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.connector.Connector;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.FilePathConfigUtil;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.FileBaseKeyValueStore;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.KeyValueStore;
-import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimeConfigDefine;
+import 
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.converter.JsonConverter;
 import 
org.apache.rocketmq.eventbridge.adapter.runtimer.converter.ListConverter;
 import org.springframework.beans.factory.annotation.Value;
@@ -97,7 +96,7 @@ public class PusherConfigManageServiceImpl implements 
PusherConfigManageService
         Map<String, TargetKeyValue> connectorConfigs = 
connectorKeyValueStore.getKVMap();
         for (String connectorName : connectorConfigs.keySet()) {
             TargetKeyValue config = connectorConfigs.get(connectorName);
-            if (0 != config.getInt(RuntimeConfigDefine.CONFIG_DELETED)) {
+            if (0 != config.getInt(RuntimerConfigDefine.CONFIG_DELETED)) {
                 continue;
             }
             result.put(connectorName, config);
@@ -109,9 +108,9 @@ public class PusherConfigManageServiceImpl implements 
PusherConfigManageService
     public String putConnectTargetConfig(String connectorName, TargetKeyValue 
configs) throws Exception {
         TargetKeyValue exist = connectorKeyValueStore.get(connectorName);
         if (null != exist) {
-            Long updateTimestamp = 
exist.getLong(RuntimeConfigDefine.UPDATE_TIMESTAMP);
+            Long updateTimestamp = 
exist.getLong(RuntimerConfigDefine.UPDATE_TIMESTAMP);
             if (null != updateTimestamp) {
-                configs.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, 
updateTimestamp);
+                configs.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, 
updateTimestamp);
             }
         }
         if (configs.equals(exist)) {
@@ -119,14 +118,14 @@ public class PusherConfigManageServiceImpl implements 
PusherConfigManageService
         }
 
         Long currentTimestamp = System.currentTimeMillis();
-        configs.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
-        for (String requireConfig : RuntimeConfigDefine.REQUEST_CONFIG) {
+        configs.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
+        for (String requireConfig : RuntimerConfigDefine.REQUEST_CONFIG) {
             if (!configs.containsKey(requireConfig)) {
                 return "Request config key: " + requireConfig;
             }
         }
 
-        String connectorClass = 
configs.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
+        String connectorClass = 
configs.getString(RuntimerConfigDefine.CONNECTOR_CLASS);
         ClassLoader classLoader = plugin.getPluginClassLoader(connectorClass);
         Class clazz;
         if (null != classLoader) {
@@ -144,7 +143,7 @@ public class PusherConfigManageServiceImpl implements 
PusherConfigManageService
 
     @Override
     public void recomputeTaskConfigs(String connectorName, Connector 
connector, Long currentTimestamp, TargetKeyValue configs) {
-        int maxTask = configs.getInt(RuntimeConfigDefine.MAX_TASK, 1);
+        int maxTask = configs.getInt(RuntimerConfigDefine.MAX_TASK, 1);
         List<KeyValue> taskConfigs = connector.taskConfigs(maxTask);
         List<TargetKeyValue> converterdConfigs = new ArrayList<>();
         for (KeyValue keyValue : taskConfigs) {
@@ -152,19 +151,19 @@ public class PusherConfigManageServiceImpl implements 
PusherConfigManageService
             for (String key : keyValue.keySet()) {
                 newKeyValue.put(key, keyValue.getString(key));
             }
-            newKeyValue.put(RuntimeConfigDefine.TASK_CLASS, 
connector.taskClass().getName());
-            newKeyValue.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, 
currentTimestamp);
+            newKeyValue.put(RuntimerConfigDefine.TASK_CLASS, 
connector.taskClass().getName());
+            newKeyValue.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, 
currentTimestamp);
 
-            newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME, 
configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAME));
-            newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAMES, 
configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAMES));
+            newKeyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAME, 
configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAME));
+            newKeyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAMES, 
configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAMES));
             Set<String> connectConfigKeySet = configs.keySet();
             for (String connectConfigKey : connectConfigKeySet) {
-                if 
(connectConfigKey.startsWith(RuntimeConfigDefine.TRANSFORMS)) {
+                if 
(connectConfigKey.startsWith(RuntimerConfigDefine.TRANSFORMS)) {
                     newKeyValue.put(connectConfigKey, 
configs.getString(connectConfigKey));
                 }
             }
             converterdConfigs.add(newKeyValue);
-            
connectTopicNames.add(configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAME));
+            
connectTopicNames.add(configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAME));
         }
         putTaskConfigs(connectorName, converterdConfigs);
     }
@@ -175,8 +174,8 @@ public class PusherConfigManageServiceImpl implements 
PusherConfigManageService
         if(Objects.isNull(config)){
             return;
         }
-        config.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, 
System.currentTimeMillis());
-        config.put(RuntimeConfigDefine.CONFIG_DELETED, 1);
+        config.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, 
System.currentTimeMillis());
+        config.put(RuntimerConfigDefine.CONFIG_DELETED, 1);
         List<TargetKeyValue> taskConfigList = 
taskKeyValueStore.get(connectorName);
         taskConfigList.add(config);
         connectorKeyValueStore.put(connectorName, config);

Reply via email to