This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/runtimer by this push:
new f22bec4 Update Runtimer Listener Offer Connect Record (#57)
f22bec4 is described below
commit f22bec4ac3bc6148d9e5c7b6b6b81db1f13bf615
Author: Artisan <[email protected]>
AuthorDate: Tue Mar 14 16:54:01 2023 +0800
Update Runtimer Listener Offer Connect Record (#57)
Update Runtimer Listener Offer Connect Record
---
adapter/runtimer/README.md | 11 ++++
.../eventbridge/adapter/runtimer/Runtimer.java | 6 +-
.../adapter/runtimer/boot/EventBusListener.java | 44 +++++++++++++-
.../adapter/runtimer/boot/EventRuleTransfer.java | 68 ++--------------------
.../adapter/runtimer/boot/EventTargetPusher.java | 8 +--
.../runtimer/boot/listener/ListenerFactory.java | 33 ++++++++++-
.../runtimer/boot/transfer/TransformEngine.java | 6 +-
...ConfigDefine.java => RuntimerConfigDefine.java} | 4 +-
.../service/PusherConfigManageServiceImpl.java | 33 +++++------
9 files changed, 112 insertions(+), 101 deletions(-)
diff --git a/adapter/runtimer/README.md b/adapter/runtimer/README.md
index bb66b9e..4e508ba 100644
--- a/adapter/runtimer/README.md
+++ b/adapter/runtimer/README.md
@@ -26,6 +26,17 @@ TODO:
metrics, logger, exception
detail
+3.10 Note
+本次会议纪要:
+1、runtimer配置从DB加载;
+2、任务配置的命名:targetRunner;
+3、更新任务配置:从Service修改成API+存储 Watch;
+4、EventBusListener: 不能出现RMQ&Message,只能围绕Connect Recorded;
+5、EventBusListener:支持位点提交;
+
+下次会议时间:3月14日,下周二 19:00
+
+
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
index 6c5095a..896885d 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
@@ -21,12 +21,10 @@ import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventBusListener;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventRuleTransfer;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventTargetPusher;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
-import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
-import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfig;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,8 +34,6 @@ import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
index dfeb3e5..0c40235 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
@@ -19,7 +19,14 @@ package
org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.internal.DefaultKeyValue;
import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -28,11 +35,13 @@ import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTarg
import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.QueueState;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
+import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
@@ -116,8 +125,9 @@ public class EventBusListener extends ServiceThread {
return;
}
for (MessageExt messageExt : messageExts) {
- listenerFactory.offerListenEvent(messageExt);
- logger.debug("consumer - {} - offer listen event -
{}", JSON.toJSONString(pullConsumer), JSON.toJSON(messageExt));
+ ConnectRecord eventRecord =
convertToSinkRecord(messageExt);
+ listenerFactory.offerEventRecord(eventRecord);
+ logger.debug("consumer - {} - offer listen event
record - {} - by - message event- {}", JSON.toJSONString(pullConsumer),
eventRecord, messageExt);
}
} finally {
pullConsumer.commitSync();
@@ -132,6 +142,36 @@ public class EventBusListener extends ServiceThread {
return this.getClass().getSimpleName();
}
+ /**
+ * MessageExt convert to connect record
+ * @param messageExt
+ * @return
+ */
+ private ConnectRecord convertToSinkRecord(MessageExt messageExt) {
+ Map<String, String> properties = messageExt.getProperties();
+ Schema schema;
+ Long timestamp;
+ ConnectRecord sinkRecord;
+ String connectTimestamp =
properties.get(RuntimerConfigDefine.CONNECT_TIMESTAMP);
+ timestamp = StringUtils.isNotEmpty(connectTimestamp) ?
Long.valueOf(connectTimestamp) : null;
+ String connectSchema =
properties.get(RuntimerConfigDefine.CONNECT_SCHEMA);
+ schema = StringUtils.isNotEmpty(connectSchema) ?
JSON.parseObject(connectSchema, Schema.class) : null;
+ byte[] body = messageExt.getBody();
+ RecordPartition recordPartition =
listenerFactory.convertToRecordPartition(messageExt.getTopic(),
messageExt.getBrokerName(), messageExt.getQueueId());
+ RecordOffset recordOffset =
listenerFactory.convertToRecordOffset(messageExt.getQueueOffset());
+ String bodyStr = new String(body, StandardCharsets.UTF_8);
+ sinkRecord = new ConnectRecord(recordPartition, recordOffset,
timestamp, schema, bodyStr);
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAME,
messageExt.getTopic());
+ if (MapUtils.isNotEmpty(properties)) {
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ keyValue.put(entry.getKey(), entry.getValue());
+ }
+ }
+ sinkRecord.addExtension(keyValue);
+ return sinkRecord;
+ }
+
/**
* consumer update listener
*/
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
index 3be7b8a..f14fdec 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
@@ -18,27 +18,17 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
import com.alibaba.fastjson.JSON;
-import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.data.ConnectRecord;
-import io.openmessaging.connector.api.data.RecordOffset;
-import io.openmessaging.connector.api.data.RecordPartition;
-import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.message.MessageExt;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
-import
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimeConfigDefine;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
@@ -59,8 +49,6 @@ public class EventRuleTransfer extends ServiceThread {
private ExecutorService executorService = new ThreadPoolExecutor(20,60,
1000,TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
- private ExecutorService singleExecutor =
Executors.newSingleThreadScheduledExecutor();
-
public EventRuleTransfer(Plugin plugin, ListenerFactory listenerFactory,
PusherConfigManageService pusherConfigManageService){
this.plugin = plugin;
this.listenerFactory = listenerFactory;
@@ -72,18 +60,6 @@ public class EventRuleTransfer extends ServiceThread {
this.taskTransformMap.putAll(initSinkTaskTransformInfo(taskConfig));
}
- private static final Set<String> MQ_SYS_KEYS = new HashSet<String>() {
- {
- add("MIN_OFFSET");
- add("TRACE_ON");
- add("MAX_OFFSET");
- add("MSG_REGION");
- add("UNIQ_KEY");
- add("WAIT");
- add("TAGS");
- }
- };
-
@Override
public String getServiceName() {
return this.getClass().getSimpleName();
@@ -92,21 +68,20 @@ public class EventRuleTransfer extends ServiceThread {
@Override
public void run() {
while (!stopped){
- MessageExt messageExt = listenerFactory.takeListenerEvent();
- if(Objects.isNull(messageExt)){
- logger.info("listen message is empty, continue by curTime -
{}", System.currentTimeMillis());
+ ConnectRecord eventRecord = listenerFactory.takeEventRecord();
+ if(Objects.isNull(eventRecord)){
+ logger.info("listen eventRecord is empty, continue by curTime
- {}", System.currentTimeMillis());
this.waitForRunning(1000);
continue;
}
executorService.submit(() -> {
- ConnectRecord connectRecord =
convertToSinkDataEntry(messageExt);
// extension add sub
// rule - target
for (TargetKeyValue targetKeyValue :
taskTransformMap.keySet()){
// add threadPool for cup task
// attention coreSize
TransformEngine<ConnectRecord> transformEngine =
taskTransformMap.get(targetKeyValue);
- ConnectRecord transformRecord =
transformEngine.doTransforms(connectRecord);
+ ConnectRecord transformRecord =
transformEngine.doTransforms(eventRecord);
if(Objects.isNull(transformRecord)){
continue;
}
@@ -147,41 +122,6 @@ public class EventRuleTransfer extends ServiceThread {
return curTaskTransformMap;
}
- /**
- * MessageExt convert to connect record
- * @param message
- * @return
- */
- private ConnectRecord convertToSinkDataEntry(MessageExt message) {
- Map<String, String> properties = message.getProperties();
- Schema schema;
- Long timestamp;
- ConnectRecord sinkDataEntry;
- String connectTimestamp =
properties.get(RuntimeConfigDefine.CONNECT_TIMESTAMP);
- timestamp = StringUtils.isNotEmpty(connectTimestamp) ?
Long.valueOf(connectTimestamp) : null;
- String connectSchema =
properties.get(RuntimeConfigDefine.CONNECT_SCHEMA);
- schema = StringUtils.isNotEmpty(connectSchema) ?
JSON.parseObject(connectSchema, Schema.class) : null;
- byte[] body = message.getBody();
- RecordPartition recordPartition =
listenerFactory.convertToRecordPartition(message.getTopic(),
message.getBrokerName(), message.getQueueId());
- RecordOffset recordOffset =
listenerFactory.convertToRecordOffset(message.getQueueOffset());
- String bodyStr = new String(body, StandardCharsets.UTF_8);
- sinkDataEntry = new ConnectRecord(recordPartition, recordOffset,
timestamp, schema, bodyStr);
- KeyValue keyValue = new DefaultKeyValue();
- if (MapUtils.isNotEmpty(properties)) {
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- if (MQ_SYS_KEYS.contains(entry.getKey())) {
- keyValue.put("MQ-SYS-" + entry.getKey(), entry.getValue());
- } else if (entry.getKey().startsWith("connect-ext-")) {
- keyValue.put(entry.getKey().replaceAll("connect-ext-",
""), entry.getValue());
- } else {
- keyValue.put(entry.getKey(), entry.getValue());
- }
- }
- }
- sinkDataEntry.addExtension(keyValue);
- return sinkDataEntry;
- }
-
/**
* transform update listener
*/
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
index 105c4b6..d324fe4 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
import io.netty.util.internal.ConcurrentSet;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.data.ConnectRecord;
-import lombok.SneakyThrows;
import org.apache.commons.collections.MapUtils;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher.PusherTaskContext;
@@ -31,14 +30,13 @@ import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyV
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
-import
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimeConfigDefine;
+import
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
/**
* event target push to sink task
@@ -81,7 +79,7 @@ public class EventTargetPusher extends ServiceThread {
}
for(TargetKeyValue targetKeyValue : taskProperty){
try{
- String taskClass =
targetKeyValue.getString(RuntimeConfigDefine.TASK_CLASS);
+ String taskClass =
targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
ClassLoader loader = plugin.getPluginClassLoader(taskClass);
Class taskClazz;
boolean isolationFlag = false;
@@ -121,7 +119,7 @@ public class EventTargetPusher extends ServiceThread {
// task-id for unique-key at ConnectKeyValue
// ConnectKeyValue -> new class for name
// also add in ConnectRecord class system property
- String taskPushName =
targetKeyValue.getString(RuntimeConfigDefine.TASK_CLASS);
+ String taskPushName =
targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
// add thread pool
for(SinkTask sinkTask : pusherTasks){
if(sinkTask.getClass().getName().equals(taskPushName)){
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
index 33804b4..07f010e 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java
@@ -31,7 +31,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
-import
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimeConfigDefine;
+import
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
@@ -56,6 +56,8 @@ public class ListenerFactory {
private BlockingQueue<MessageExt> eventMessage = new
LinkedBlockingQueue(50000);
+ private BlockingQueue<ConnectRecord> eventRecord = new
LinkedBlockingQueue<>(50000);
+
private BlockingQueue<Map<TargetKeyValue, ConnectRecord>> targetQueue =
new LinkedBlockingQueue<>(50000);
@Value("${rocketmq.namesrvAddr:}")
@@ -107,6 +109,31 @@ public class ListenerFactory {
return null;
}
+ /**
+ * offer event record
+ * @param connectRecord
+ * @return
+ */
+ public boolean offerEventRecord(ConnectRecord connectRecord){
+ return eventRecord.offer(connectRecord);
+ }
+
+ /**
+ * take event record
+ * @return
+ */
+ public ConnectRecord takeEventRecord() {
+ if(eventRecord.isEmpty()){
+ return null;
+ }
+ try {
+ return eventRecord.take();
+ }catch (Exception exception){
+ logger.error("take event record exception - stack-> ", exception);
+ }
+ return null;
+ }
+
public String createInstance(String servers) {
String[] serversArray = servers.split(";");
List<String> serversList = new ArrayList<String>();
@@ -125,7 +152,7 @@ public class ListenerFactory {
* @return
*/
public List<String> parseTopicList(TargetKeyValue taskConfig) {
- String messageQueueStr =
taskConfig.getString(RuntimeConfigDefine.CONNECT_TOPICNAME);
+ String messageQueueStr =
taskConfig.getString(RuntimerConfigDefine.CONNECT_TOPICNAME);
if (StringUtils.isBlank(messageQueueStr)) {
return null;
}
@@ -141,7 +168,7 @@ public class ListenerFactory {
public List<String> parseTopicListByList(List<TargetKeyValue> taskConfigs)
{
Set<String> allTopicList = Sets.newHashSet();
for(TargetKeyValue taskConfig : taskConfigs){
- String messageQueueStr =
taskConfig.getString(RuntimeConfigDefine.CONNECT_TOPICNAME);
+ String messageQueueStr =
taskConfig.getString(RuntimerConfigDefine.CONNECT_TOPICNAME);
if (StringUtils.isBlank(messageQueueStr)) {
continue;
}
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
index 8c75702..9b59487 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
@@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
-import
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimeConfigDefine;
+import
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +49,7 @@ public class TransformEngine<R extends ConnectRecord>
implements AutoCloseable {
private static final String COMMA = ",";
- private static final String PREFIX = RuntimeConfigDefine.TRANSFORMS + "-";
+ private static final String PREFIX = RuntimerConfigDefine.TRANSFORMS + "-";
public TransformEngine(KeyValue config, Plugin plugin) {
this.config = config;
@@ -59,7 +59,7 @@ public class TransformEngine<R extends ConnectRecord>
implements AutoCloseable {
}
private void init() {
- String transformsStr =
config.getString(RuntimeConfigDefine.TRANSFORMS);
+ String transformsStr =
config.getString(RuntimerConfigDefine.TRANSFORMS);
if (StringUtils.isBlank(transformsStr)) {
log.warn("no transforms config, {}", JSON.toJSONString(config));
return;
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimeConfigDefine.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
similarity index 97%
rename from
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimeConfigDefine.java
rename to
adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
index 3a208d2..ebf1c88 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimeConfigDefine.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
@@ -24,9 +24,9 @@ import java.util.HashSet;
import java.util.Set;
/**
- * Define keys for connector and task configs.
+ * Define keys for target runner configs.
*/
-public class RuntimeConfigDefine {
+public class RuntimerConfigDefine {
/**
* The full class name of a specific connector implements.
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
index 1dde731..58a66fd 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java
@@ -20,14 +20,13 @@ package
org.apache.rocketmq.eventbridge.adapter.runtimer.service;
import com.google.common.collect.Lists;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.connector.Connector;
-import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.FilePathConfigUtil;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.FileBaseKeyValueStore;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.KeyValueStore;
-import
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimeConfigDefine;
+import
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.converter.JsonConverter;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.converter.ListConverter;
import org.springframework.beans.factory.annotation.Value;
@@ -97,7 +96,7 @@ public class PusherConfigManageServiceImpl implements
PusherConfigManageService
Map<String, TargetKeyValue> connectorConfigs =
connectorKeyValueStore.getKVMap();
for (String connectorName : connectorConfigs.keySet()) {
TargetKeyValue config = connectorConfigs.get(connectorName);
- if (0 != config.getInt(RuntimeConfigDefine.CONFIG_DELETED)) {
+ if (0 != config.getInt(RuntimerConfigDefine.CONFIG_DELETED)) {
continue;
}
result.put(connectorName, config);
@@ -109,9 +108,9 @@ public class PusherConfigManageServiceImpl implements
PusherConfigManageService
public String putConnectTargetConfig(String connectorName, TargetKeyValue
configs) throws Exception {
TargetKeyValue exist = connectorKeyValueStore.get(connectorName);
if (null != exist) {
- Long updateTimestamp =
exist.getLong(RuntimeConfigDefine.UPDATE_TIMESTAMP);
+ Long updateTimestamp =
exist.getLong(RuntimerConfigDefine.UPDATE_TIMESTAMP);
if (null != updateTimestamp) {
- configs.put(RuntimeConfigDefine.UPDATE_TIMESTAMP,
updateTimestamp);
+ configs.put(RuntimerConfigDefine.UPDATE_TIMESTAMP,
updateTimestamp);
}
}
if (configs.equals(exist)) {
@@ -119,14 +118,14 @@ public class PusherConfigManageServiceImpl implements
PusherConfigManageService
}
Long currentTimestamp = System.currentTimeMillis();
- configs.put(RuntimeConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
- for (String requireConfig : RuntimeConfigDefine.REQUEST_CONFIG) {
+ configs.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, currentTimestamp);
+ for (String requireConfig : RuntimerConfigDefine.REQUEST_CONFIG) {
if (!configs.containsKey(requireConfig)) {
return "Request config key: " + requireConfig;
}
}
- String connectorClass =
configs.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
+ String connectorClass =
configs.getString(RuntimerConfigDefine.CONNECTOR_CLASS);
ClassLoader classLoader = plugin.getPluginClassLoader(connectorClass);
Class clazz;
if (null != classLoader) {
@@ -144,7 +143,7 @@ public class PusherConfigManageServiceImpl implements
PusherConfigManageService
@Override
public void recomputeTaskConfigs(String connectorName, Connector
connector, Long currentTimestamp, TargetKeyValue configs) {
- int maxTask = configs.getInt(RuntimeConfigDefine.MAX_TASK, 1);
+ int maxTask = configs.getInt(RuntimerConfigDefine.MAX_TASK, 1);
List<KeyValue> taskConfigs = connector.taskConfigs(maxTask);
List<TargetKeyValue> converterdConfigs = new ArrayList<>();
for (KeyValue keyValue : taskConfigs) {
@@ -152,19 +151,19 @@ public class PusherConfigManageServiceImpl implements
PusherConfigManageService
for (String key : keyValue.keySet()) {
newKeyValue.put(key, keyValue.getString(key));
}
- newKeyValue.put(RuntimeConfigDefine.TASK_CLASS,
connector.taskClass().getName());
- newKeyValue.put(RuntimeConfigDefine.UPDATE_TIMESTAMP,
currentTimestamp);
+ newKeyValue.put(RuntimerConfigDefine.TASK_CLASS,
connector.taskClass().getName());
+ newKeyValue.put(RuntimerConfigDefine.UPDATE_TIMESTAMP,
currentTimestamp);
- newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAME,
configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAME));
- newKeyValue.put(RuntimeConfigDefine.CONNECT_TOPICNAMES,
configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAMES));
+ newKeyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAME,
configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAME));
+ newKeyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAMES,
configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAMES));
Set<String> connectConfigKeySet = configs.keySet();
for (String connectConfigKey : connectConfigKeySet) {
- if
(connectConfigKey.startsWith(RuntimeConfigDefine.TRANSFORMS)) {
+ if
(connectConfigKey.startsWith(RuntimerConfigDefine.TRANSFORMS)) {
newKeyValue.put(connectConfigKey,
configs.getString(connectConfigKey));
}
}
converterdConfigs.add(newKeyValue);
-
connectTopicNames.add(configs.getString(RuntimeConfigDefine.CONNECT_TOPICNAME));
+
connectTopicNames.add(configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAME));
}
putTaskConfigs(connectorName, converterdConfigs);
}
@@ -175,8 +174,8 @@ public class PusherConfigManageServiceImpl implements
PusherConfigManageService
if(Objects.isNull(config)){
return;
}
- config.put(RuntimeConfigDefine.UPDATE_TIMESTAMP,
System.currentTimeMillis());
- config.put(RuntimeConfigDefine.CONFIG_DELETED, 1);
+ config.put(RuntimerConfigDefine.UPDATE_TIMESTAMP,
System.currentTimeMillis());
+ config.put(RuntimerConfigDefine.CONFIG_DELETED, 1);
List<TargetKeyValue> taskConfigList =
taskKeyValueStore.get(connectorName);
taskConfigList.add(config);
connectorKeyValueStore.put(connectorName, config);