This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/runtimer by this push:
new 2ca9952 Standard the run context of the config json file (#89)
2ca9952 is described below
commit 2ca9952ef9a239d407653229d9d7c878e9b8bd5b
Author: Artisan <[email protected]>
AuthorDate: Thu Apr 20 14:29:02 2023 +0800
Standard the run context of the config json file (#89)
Standard the run context of the config json file
---
adapter/runtimer/pom.xml | 2 +-
.../adapter/runtimer/boot/EventRuleTransfer.java | 7 ++-
.../runtimer/boot/listener/CirculatorContext.java | 11 ++--
.../boot/listener/RocketMQEventSubscriber.java | 4 +-
.../runtimer/boot/transfer/TransformEngine.java | 59 ++++++++++++----------
.../runtimer/common/entity/TargetRunnerConfig.java | 8 +--
.../runtimer/config/RuntimerConfigDefine.java | 21 ++------
.../AbstractTargetRunnerConfigObserver.java | 4 --
.../service/TargetRunnerConfigObserver.java | 2 -
.../runtimer/src/main/resources/target-runner.json | 15 +++---
10 files changed, 61 insertions(+), 72 deletions(-)
diff --git a/adapter/runtimer/pom.xml b/adapter/runtimer/pom.xml
index 20a8ee6..158f76c 100644
--- a/adapter/runtimer/pom.xml
+++ b/adapter/runtimer/pom.xml
@@ -64,7 +64,7 @@
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
- <version>0.9.11</version>
+ <version>0.9.10</version>
<scope>compile</scope>
</dependency>
<dependency>
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
index cbb3585..635c4bd 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
import io.openmessaging.connector.api.data.ConnectRecord;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.StringUtils;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
@@ -71,10 +70,10 @@ public class EventRuleTransfer extends ServiceThread {
continue;
}
// the event channel take rocket mq topic name as default
- String eventChannelKey = RuntimerConfigDefine.CONNECT_TOPICNAME;
- String eventChannel = eventRecord.getExtension(eventChannelKey);
+ String eventChannelName = RuntimerConfigDefine.CHANNEL_NAME;
+ String eventChannel = eventRecord.getExtension(eventChannelName);
Set<TransformEngine<ConnectRecord>> adaptTransformSet =
latestTransformMap.values().stream()
- .filter(engine ->
eventChannel.equals(engine.getConnectConfig(eventChannelKey)))
+ .filter(engine ->
eventChannel.equals(engine.getConnectConfig(eventChannelName)))
.collect(Collectors.toSet());
if(CollectionUtils.isEmpty(adaptTransformSet)){
logger.warn("adapt specific topic ref transform engine is
empty, eventChannelName- {}", eventChannel);
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
index de371f7..fd29d8f 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
@@ -34,6 +34,7 @@ import
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDef
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
import java.util.List;
import java.util.Map;
@@ -161,15 +162,15 @@ public class CirculatorContext implements
TargetRunnerListener {
switch (refreshTypeEnum) {
case ADD:
case UPDATE:
- TargetKeyValue targetKeyValue = new TargetKeyValue();
-
targetRunnerConfig.getComponents().forEach(targetKeyValue::putAll);
- TransformEngine<ConnectRecord> transformChain = new
TransformEngine<>(targetKeyValue, plugin);
+ TransformEngine<ConnectRecord> transformChain = new
TransformEngine<>(targetRunnerConfig.getComponents(), plugin);
taskTransformMap.put(runnerName, transformChain);
+ int endIndex = targetRunnerConfig.getComponents().size() -1;
+ TargetKeyValue targetKeyValue = new
TargetKeyValue(targetRunnerConfig.getComponents().get(endIndex));
SinkTask sinkTask = initTargetSinkTask(targetKeyValue);
pusherTaskMap.put(runnerName, sinkTask);
- String pusherClass =
targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
+ String pusherClass =
targetKeyValue.getString(RuntimerConfigDefine.RUNNER_CLASS);
if (StringUtils.isNotEmpty(pusherClass) &&
!pusherExecutorMap.containsKey(pusherClass)) {
pusherExecutorMap.put(pusherClass,
initDefaultThreadPoolExecutor(pusherClass));
}
@@ -200,7 +201,7 @@ public class CirculatorContext implements
TargetRunnerListener {
* @return
*/
private SinkTask initTargetSinkTask(TargetKeyValue targetKeyValue) {
- String taskClass =
targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
+ String taskClass =
targetKeyValue.getString(RuntimerConfigDefine.RUNNER_CLASS);
ClassLoader loader = plugin.getPluginClassLoader(taskClass);
Class taskClazz;
boolean isolationFlag = false;
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
index 10d25ac..aa263c7 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
@@ -159,7 +159,7 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
logger.warn("target runner config components is empty, config
info - {}", runnerConfig);
continue;
}
-
listenTopics.add(runnerConfigMap.iterator().next().get(RuntimerConfigDefine.CONNECT_TOPICNAME));
+
listenTopics.add(runnerConfigMap.iterator().next().get(RuntimerConfigDefine.CHANNEL_NAME));
}
return listenTopics;
}
@@ -280,7 +280,7 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
String bodyStr = new String(body, StandardCharsets.UTF_8);
sinkRecord = new ConnectRecord(recordPartition, recordOffset,
timestamp, schema, bodyStr);
KeyValue keyValue = new DefaultKeyValue();
- keyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAME,
messageExt.getTopic());
+ keyValue.put(RuntimerConfigDefine.CHANNEL_NAME, messageExt.getTopic());
if (MapUtils.isNotEmpty(properties)) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
keyValue.put(entry.getKey(), entry.getValue());
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
index c29fc05..56f8549 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
@@ -17,16 +17,13 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer;
-import com.alibaba.fastjson.JSON;
-import com.google.common.base.Splitter;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.Transform;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.internal.DefaultKeyValue;
-import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
+import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
@@ -35,8 +32,8 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
-import java.util.Set;
public class TransformEngine<R extends ConnectRecord> implements AutoCloseable
{
@@ -44,7 +41,7 @@ public class TransformEngine<R extends ConnectRecord>
implements AutoCloseable {
private final List<Transform> transformList;
- private Map<String,List<Transform>> transformListMap;
+ private List<Map<String, String>> transferConfigs;
private final KeyValue config;
@@ -54,35 +51,25 @@ public class TransformEngine<R extends ConnectRecord>
implements AutoCloseable {
private static final String PREFIX = RuntimerConfigDefine.TRANSFORMS + "-";
- public TransformEngine(KeyValue config, Plugin plugin) {
- this.config = config;
+ public TransformEngine(List<Map<String, String>> transferConfigs, Plugin
plugin) {
+ this.transferConfigs = transferConfigs;
+ this.config = formatTargetKey(transferConfigs);
this.plugin = plugin;
transformList = new ArrayList<>(8);
init();
}
private void init() {
- String transformsStr =
config.getString(RuntimerConfigDefine.TRANSFORMS);
- if (StringUtils.isBlank(transformsStr)) {
- log.warn("no transforms config, {}", JSON.toJSONString(config));
- return;
- }
- List<String> transformList =
Splitter.on(COMMA).omitEmptyStrings().trimResults().splitToList(transformsStr);
- if (CollectionUtils.isEmpty(transformList)) {
- log.warn("transforms config is null, {}",
JSON.toJSONString(config));
- return;
- }
- transformList.stream().forEach(transformStr -> {
- String transformClassKey = PREFIX + transformStr + "-class";
- String transformClass = config.getString(transformClassKey);
+ int endIndex = transferConfigs.size() - 1;
+ for (int index = 1; index < endIndex; index++) {
+ Map<String, String> transferMap = transferConfigs.get(index);
+ String transformClass =
transferMap.get(RuntimerConfigDefine.RUNNER_CLASS);
try {
Transform transform = getTransform(transformClass);
KeyValue transformConfig = new DefaultKeyValue();
- Set<String> configKeys = config.keySet();
- for (String key : configKeys) {
- if (key.startsWith(PREFIX + transformStr) &&
!key.equals(transformClassKey)) {
- String originKey = key.replace(PREFIX + transformStr +
"-", "");
- transformConfig.put(originKey, config.getString(key));
+ for (String key : transferMap.keySet()) {
+ if (!key.equals(RuntimerConfigDefine.RUNNER_CLASS)) {
+ transformConfig.put(key, transferMap.get(key));
}
}
transform.validate(transformConfig);
@@ -91,7 +78,25 @@ public class TransformEngine<R extends ConnectRecord>
implements AutoCloseable {
} catch (Exception e) {
log.error("transform new instance error", e);
}
- });
+ }
+ }
+
+ /**
+ * format listener and pusher key
+ * @param components
+ * @return
+ */
+ private TargetKeyValue formatTargetKey(List<Map<String, String>>
components) {
+ if(CollectionUtils.isEmpty(components)){
+ return null;
+ }
+ int startIndex = 0;
+ int endIndex = components.size() - 1;
+ // init listener key
+ TargetKeyValue targetKeyValue = new
TargetKeyValue(components.get(startIndex));
+ // init pusher key
+ targetKeyValue.put(RuntimerConfigDefine.TASK_CLASS,
components.get(endIndex).get(RuntimerConfigDefine.RUNNER_CLASS));
+ return targetKeyValue;
}
/**
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
index 63edbc8..20f3d5a 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
@@ -57,11 +57,11 @@ public class TargetRunnerConfig implements Serializable {
@Override
public String toString() {
- //TODO
return "TargetRunnerConfig{" +
- "connectName='" + name + '\'' +
- ", properties=" + components +
- '}';
+ "name='" + name + '\'' +
+ ", components=" + components +
+ ", runOptions=" + runOptions +
+ '}';
}
private boolean isEqualsComponents(List<Map<String, String>> source,
List<Map<String, String>> target) {
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
index 332f33d..578e92a 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigDefine.java
@@ -32,6 +32,10 @@ public class RuntimerConfigDefine {
*/
public static final String CONNECTOR_CLASS = "connector-class";
+ public static final String RUNNER_CLASS = "class";
+
+ public static final String TRANSFER_CLASS = "transfer-class";
+
public static final String CONNECTOR_DIRECT_ENABLE =
"connector-direct-enable";
public static final String TASK_CLASS = "task-class";
@@ -79,7 +83,7 @@ public class RuntimerConfigDefine {
public static final String CONNECT_SHARDINGKEY = "connect-shardingkey";
- public static final String CONNECT_TOPICNAME = "connect-topicname";
+ public static final String CHANNEL_NAME = "channel-name";
public static final String CONNECT_RULE_NAME = "connect-rule-name";
@@ -101,19 +105,4 @@ public class RuntimerConfigDefine {
public static final String TARGET_RUNNER_KEY = "eventBusName";
- /**
- * The required key for all configurations.
- */
- public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
- {
- add(CONNECTOR_CLASS);
- add(CONNECT_TOPICNAME);
- }
- };
-
- /**
- * Maximum allowed message size in bytes, the default vaule is 4M.
- */
- public static final int MAX_MESSAGE_SIZE =
Integer.parseInt(System.getProperty("rocketmq.runtime.max.message.size",
"4194304"));
-
}
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
index 619bc63..5cdbe62 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
@@ -88,8 +88,4 @@ public abstract class AbstractTargetRunnerConfigObserver
implements TargetRunner
listener.onDeleteTargetRunner(targetRunnerConfig);
}
}
-
- public Map<String, List<TargetKeyValue>> getTaskConfigs() {
- return null;
- }
}
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
index c6c7670..6c713e3 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java
@@ -42,6 +42,4 @@ public interface TargetRunnerConfigObserver {
*/
void registerListener(TargetRunnerListener listener);
- @Deprecated
- Map<String, List<TargetKeyValue>> getTaskConfigs();
}
diff --git a/adapter/runtimer/src/main/resources/target-runner.json
b/adapter/runtimer/src/main/resources/target-runner.json
index e94e506..6ffcaf7 100644
--- a/adapter/runtimer/src/main/resources/target-runner.json
+++ b/adapter/runtimer/src/main/resources/target-runner.json
@@ -4,17 +4,18 @@
"components":[
{
"runner-name": "demo-runner",
- "connect-topicname":"eventbridge%654321%demo-bus_1678348282165"
+ "channel-name":"eventbridge%654321%demo-bus_1678348282165"
},
{
- "transforms":"filter,transform,",
- "transforms-filter-filterPattern":"{}",
-
"transforms-filter-class":"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeFilterTransform",
-
"transforms-transform-data":"{\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\",\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\"}",
- "transforms-transform-class":
"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform"
+ "filterPattern":"{}",
+
"class":"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeFilterTransform"
},
{
-
"task-class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkTask",
+
"data":"{\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\",\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\"}",
+ "class":
"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform"
+ },
+ {
+ "class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkTask",
"webHook":"https://oapi.dingtalk.com/robot/send?access_token=7f78aa4734ea9bd245984e47b6764ccb950b4292e4f6f9424dff92909f485f16",
"secretKey":"SEC8a898c9df7b6415090a8f1341d9eed000c815a89f301f2de87302a1e802dbd69"
}