This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/runtimer by this push:
new d30d9b8 Optimize fileObserver formate and Support module (#76)
d30d9b8 is described below
commit d30d9b8ff5bd8a1739e8b2380dcce7be688d9a31
Author: Artisan <[email protected]>
AuthorDate: Fri Apr 7 17:45:30 2023 +0800
Optimize fileObserver formate and Support module (#76)
Optimize fileObserver formate and Support module (#76)
---
.../eventbridge/adapter/runtimer/Runtimer.java | 3 +-
.../adapter/runtimer/boot/EventBusListener.java | 4 +-
.../adapter/runtimer/boot/EventRuleTransfer.java | 76 ++++++++++++++++------
.../adapter/runtimer/boot/EventTargetPusher.java | 18 +++--
.../runtimer/boot/listener/CirculatorContext.java | 60 ++++++++++++-----
.../boot/listener/RocketMQEventSubscriber.java | 48 ++++++++------
.../runtimer/boot/transfer/TransformEngine.java | 14 ++++
.../runtimer/common/entity/TargetKeyValue.java | 5 ++
.../adapter/runtimer/config/RuntimerConfig.java | 37 -----------
.../runtimer/config/RuntimerConfigProps.java | 53 +++++++++++++++
.../service/TargetRunnerConfigOnFileObserver.java | 5 +-
.../src/main/resources/runtimer.properties | 6 +-
.../runtimer/src/main/resources/target-runner.json | 42 ++++++------
supports/connect-cloudevent-transform/pom.xml | 2 +-
.../transform/eventbridge/CloudEventTransform.java | 8 ++-
.../rocketmq/connect/CloudEventTransformTest.java | 2 +-
supports/connect-eventbridge-transform/pom.xml | 2 +-
.../eventbridge/EventBridgeTransform.java | 8 ++-
.../rocketmq/connect/EventBridgeTransformTest.java | 2 +-
supports/connect-filter-transform/pom.xml | 2 +-
.../eventbridge/EventBridgeFilterTransform.java | 8 ++-
.../EventBridgeFilterTransformTest.java | 2 +-
22 files changed, 269 insertions(+), 138 deletions(-)
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
index f871e08..399fba9 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java
@@ -25,6 +25,7 @@ import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.Circulator
import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.RocketMQEventSubscriber;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.service.AbstractTargetRunnerConfigObserver;
+import
org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigOnFileObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ public class Runtimer {
private CirculatorContext circulatorContext;
- private AbstractTargetRunnerConfigObserver runnerConfigObserver;
+ private TargetRunnerConfigObserver runnerConfigObserver;
public Runtimer(CirculatorContext circulatorContext) {
this.circulatorContext = circulatorContext;
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
index d9227d0..7a34aa3 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java
@@ -36,9 +36,9 @@ public class EventBusListener extends ServiceThread {
private static final Logger logger =
LoggerFactory.getLogger(EventBusListener.class);
- private CirculatorContext circulatorContext;
+ private final CirculatorContext circulatorContext;
- private EventSubscriber eventSubscriber;
+ private final EventSubscriber eventSubscriber;
public EventBusListener(CirculatorContext circulatorContext,
EventSubscriber eventSubscriber) {
this.circulatorContext = circulatorContext;
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
index f9492f6..cbb3585 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java
@@ -18,18 +18,24 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.boot;
import com.alibaba.fastjson.JSON;
+import com.google.common.collect.Lists;
import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
+import
org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
/**
* receive event and transfer the rule to pusher
@@ -38,9 +44,7 @@ public class EventRuleTransfer extends ServiceThread {
private static final Logger logger =
LoggerFactory.getLogger(EventRuleTransfer.class);
- private CirculatorContext circulatorContext;
-
- private ExecutorService executorService = new ThreadPoolExecutor(20, 60,
1000, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100));
+ private final CirculatorContext circulatorContext;
public EventRuleTransfer(CirculatorContext circulatorContext) {
this.circulatorContext = circulatorContext;
@@ -54,27 +58,57 @@ public class EventRuleTransfer extends ServiceThread {
@Override
public void run() {
while (!stopped) {
- // add CompletableFuture
ConnectRecord eventRecord = circulatorContext.takeEventRecord();
if (Objects.isNull(eventRecord)) {
logger.info("listen eventRecord is empty, continue by curTime
- {}", System.currentTimeMillis());
this.waitForRunning(1000);
continue;
}
- executorService.submit(() -> {
- // extension add sub
- // rule - target
-
circulatorContext.getTaskTransformMap().entrySet().forEach(entry -> {
- TransformEngine<ConnectRecord> transformEngine =
entry.getValue();
- ConnectRecord transformRecord =
transformEngine.doTransforms(eventRecord);
- if (Objects.isNull(transformRecord)) {
- return;
- }
- // a bean for maintain
- circulatorContext.offerTargetTaskQueue(transformRecord);
- logger.debug("offer target task queue succeed, targetMap -
{}", JSON.toJSONString(transformRecord));
- });
+ Map<String, TransformEngine<ConnectRecord>> latestTransformMap =
circulatorContext.getTaskTransformMap();
+ if(MapUtils.isEmpty(latestTransformMap)){
+ logger.warn("latest transform engine is empty, continue by
curTime - {}", System.currentTimeMillis());
+ this.waitForRunning(3000);
+ continue;
+ }
+ // the event channel take rocket mq topic name as default
+ String eventChannelKey = RuntimerConfigDefine.CONNECT_TOPICNAME;
+ String eventChannel = eventRecord.getExtension(eventChannelKey);
+ Set<TransformEngine<ConnectRecord>> adaptTransformSet =
latestTransformMap.values().stream()
+ .filter(engine ->
eventChannel.equals(engine.getConnectConfig(eventChannelKey)))
+ .collect(Collectors.toSet());
+ if(CollectionUtils.isEmpty(adaptTransformSet)){
+ logger.warn("adapt specific topic ref transform engine is
empty, eventChannelName- {}", eventChannel);
+ this.waitForRunning(3000);
+ continue;
+ }
+ List<ConnectRecord> afterTransformConnect = Lists.newArrayList();
+ List<CompletableFuture<Void>> completableFutures =
Lists.newArrayList();
+ adaptTransformSet.forEach(transfer->{
+ CompletableFuture<Void> transformFuture =
CompletableFuture.supplyAsync(()-> transfer.doTransforms(eventRecord))
+ .exceptionally((exception) -> {
+ logger.error("transfer do transform event record
failed,stackTrace-", exception);
+ return null;
+ })
+ .thenAccept(record-> {
+ if(Objects.nonNull(record)){
+ String runnerNameKey =
RuntimerConfigDefine.RUNNER_NAME;
+ String taskClassKey =
RuntimerConfigDefine.TASK_CLASS;
+ record.getExtensions().put(runnerNameKey,
transfer.getConnectConfig(runnerNameKey));
+ record.getExtensions().put(taskClassKey,
transfer.getConnectConfig(taskClassKey));
+ afterTransformConnect.add(record);
+ }
+ });
+ completableFutures.add(transformFuture);
});
+
+ try{
+ CompletableFuture.allOf(completableFutures.toArray(new
CompletableFuture[adaptTransformSet.size()])).get();
+ circulatorContext.offerTargetTaskQueue(afterTransformConnect);
+ logger.info("offer target task queues succeed, transforms -
{}", JSON.toJSONString(afterTransformConnect));
+ }catch (Exception exception){
+ logger.error("transfer event record failed, stackTrace-",
exception);
+ }
+
}
}
}
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
index 9c17d2a..43e6b29 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ExecutorService;
/**
* event target push to sink task
@@ -39,7 +40,7 @@ public class EventTargetPusher extends ServiceThread{
private static final Logger logger =
LoggerFactory.getLogger(EventTargetPusher.class);
- private CirculatorContext circulatorContext;
+ private final CirculatorContext circulatorContext;
public EventTargetPusher(CirculatorContext circulatorContext) {
this.circulatorContext = circulatorContext;
@@ -58,11 +59,16 @@ public class EventTargetPusher extends ServiceThread{
logger.debug("start push content by pusher - {}",
JSON.toJSONString(targetRecord));
}
- Map<String, SinkTask> latestTaskMap =
circulatorContext.getPusherTaskMap();
- String runnerName =
targetRecord.getExtensions().getString(RuntimerConfigDefine.RUNNER_NAME);
- SinkTask sinkTask = latestTaskMap.get(runnerName);
- // add thread pool
- sinkTask.put(Lists.newArrayList(targetRecord));
+ ExecutorService executorService =
circulatorContext.getExecutorService(targetRecord.getExtensions().getString(RuntimerConfigDefine.TASK_CLASS));
+ executorService.execute(() -> {
+ try {
+ String runnerName =
targetRecord.getExtensions().getString(RuntimerConfigDefine.RUNNER_NAME);
+ SinkTask sinkTask =
circulatorContext.getPusherTaskMap().get(runnerName);;
+ sinkTask.put(Lists.newArrayList(targetRecord));
+ }catch (Exception exception){
+ logger.error(getServiceName() + " push target exception,
record - " + targetRecord + " , stackTrace-", exception);
+ }
+ });
}
}
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
index 58e6b04..de371f7 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/CirculatorContext.java
@@ -17,9 +17,11 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.data.ConnectRecord;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher.PusherTaskContext;
import
org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName;
@@ -36,9 +38,7 @@ import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.*;
/**
* event circulator context for transfer and pusher
@@ -48,7 +48,7 @@ public class CirculatorContext implements
TargetRunnerListener {
private final static Logger logger =
LoggerFactory.getLogger(LoggerName.EventBus_Listener);
- private BlockingQueue<ConnectRecord> eventRecord = new
LinkedBlockingQueue<>(50000);
+ private BlockingQueue<ConnectRecord> eventQueue = new
LinkedBlockingQueue<>(50000);
private BlockingQueue<ConnectRecord> targetQueue = new
LinkedBlockingQueue<>(50000);
@@ -56,6 +56,8 @@ public class CirculatorContext implements
TargetRunnerListener {
private Map<String/*RunnerName*/, SinkTask> pusherTaskMap = new
ConcurrentHashMap<>(20);
+ private Map<String/*PusherCLass*/, ExecutorService> pusherExecutorMap =
new ConcurrentHashMap<>(10);
+
private Plugin plugin;
public CirculatorContext(Plugin plugin){
@@ -95,8 +97,8 @@ public class CirculatorContext implements
TargetRunnerListener {
*
* @param connectRecords
*/
- public boolean offerEventRecords(List<ConnectRecord> connectRecords) {
- return eventRecord.addAll(connectRecords);
+ public boolean offerEventRecords(List<ConnectRecord> connectRecords) {
+ return eventQueue.addAll(connectRecords);
}
/**
@@ -105,11 +107,11 @@ public class CirculatorContext implements
TargetRunnerListener {
* @return
*/
public ConnectRecord takeEventRecord() {
- if (eventRecord.isEmpty()) {
+ if (eventQueue.isEmpty()) {
return null;
}
try {
- return eventRecord.take();
+ return eventQueue.take();
} catch (Exception exception) {
logger.error("take event record exception - stack-> ", exception);
}
@@ -124,8 +126,8 @@ public class CirculatorContext implements
TargetRunnerListener {
return pusherTaskMap;
}
- public boolean offerTargetTaskQueue(ConnectRecord connectRecord) {
- return targetQueue.offer(connectRecord);
+ public boolean offerTargetTaskQueue(List<ConnectRecord> connectRecords) {
+ return targetQueue.addAll(connectRecords);
}
public ConnectRecord takeTargetMap() {
@@ -140,6 +142,15 @@ public class CirculatorContext implements
TargetRunnerListener {
return null;
}
+ /**
+ * get specific thread pool by push name
+ * @param pushName
+ * @return
+ */
+ public ExecutorService getExecutorService(String pushName){
+ return pusherExecutorMap.get(pushName);
+ }
+
/**
* refresh target runner where config changed
* @param targetRunnerConfig
@@ -147,17 +158,22 @@ public class CirculatorContext implements
TargetRunnerListener {
*/
private void refreshRunnerMetadata(TargetRunnerConfig targetRunnerConfig,
RefreshTypeEnum refreshTypeEnum) {
String runnerName = targetRunnerConfig.getName();
- switch (refreshTypeEnum){
+ switch (refreshTypeEnum) {
case ADD:
case UPDATE:
- for(Map<String, String> configMap :
targetRunnerConfig.getComponents()){
- TargetKeyValue targetKeyValue = new
TargetKeyValue(configMap);
- TransformEngine<ConnectRecord> transformChain = new
TransformEngine<>(targetKeyValue, plugin);
- taskTransformMap.put(runnerName, transformChain);
+ TargetKeyValue targetKeyValue = new TargetKeyValue();
+
targetRunnerConfig.getComponents().forEach(targetKeyValue::putAll);
+ TransformEngine<ConnectRecord> transformChain = new
TransformEngine<>(targetKeyValue, plugin);
+ taskTransformMap.put(runnerName, transformChain);
+
+ SinkTask sinkTask = initTargetSinkTask(targetKeyValue);
+ pusherTaskMap.put(runnerName, sinkTask);
- SinkTask sinkTask = initTargetSinkTask(targetKeyValue);
- pusherTaskMap.put(runnerName, sinkTask);
+ String pusherClass =
targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS);
+ if (StringUtils.isNotEmpty(pusherClass) &&
!pusherExecutorMap.containsKey(pusherClass)) {
+ pusherExecutorMap.put(pusherClass,
initDefaultThreadPoolExecutor(pusherClass));
}
+
break;
case DELETE:
taskTransformMap.remove(runnerName);
@@ -168,6 +184,16 @@ public class CirculatorContext implements
TargetRunnerListener {
}
}
+ /**
+ * init default thread poll param, support auto config
+ * @param threadPollName
+ * @return
+ */
+ private ExecutorService initDefaultThreadPoolExecutor(String
threadPollName) {
+ ThreadFactoryBuilder threadFactory = new
ThreadFactoryBuilder().setNameFormat(threadPollName);
+ return new ThreadPoolExecutor(200, 300, 1, TimeUnit.SECONDS, new
LinkedBlockingQueue<>(300), threadFactory.build());
+ }
+
/**
* init target sink task
* @param targetKeyValue
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
index 4ca0507..1e504c1 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java
@@ -27,6 +27,7 @@ import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
@@ -43,11 +44,10 @@ import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.support.PropertiesLoaderUtils;
-import org.springframework.util.CollectionUtils;
import java.nio.charset.StandardCharsets;
import java.util.*;
-import java.util.stream.Collectors;
+import java.util.concurrent.CompletableFuture;
/**
* RocketMQ implement event subscriber
@@ -58,12 +58,14 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
private DefaultLitePullConsumer pullConsumer;
- private TargetRunnerConfigObserver runnerConfigObserver;
+ private final TargetRunnerConfigObserver runnerConfigObserver;
private Integer pullTimeOut;
private String namesrvAddr;
+ private Integer pullBatchSize;
+
private static final String SEMICOLON = ";";
private static final String SYS_DEFAULT_GROUP =
"event-bridge-default-group";
@@ -100,25 +102,31 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
@Override
public List<ConnectRecord> pull() {
- List<MessageExt> messageExts = pullConsumer.poll(pullTimeOut);
- if (CollectionUtils.isEmpty(messageExts)) {
+ List<MessageExt> messages = pullConsumer.poll(pullTimeOut);
+ if (CollectionUtils.isEmpty(messages)) {
logger.info("consumer poll message empty , consumer - {}",
JSON.toJSONString(pullConsumer));
return null;
}
List<ConnectRecord> connectRecords = Lists.newArrayList();
- for (MessageExt messageExt : messageExts) {
- ConnectRecord eventRecord = convertToSinkRecord(messageExt);
- connectRecords.add(eventRecord);
- if(logger.isDebugEnabled()){
- logger.debug("offer listen event record - {} - by message
event- {}", eventRecord, messageExt);
- }
- }
+ List<CompletableFuture<Void>> completableFutures =
Lists.newArrayList();
+ messages.forEach(item->{
+ CompletableFuture<Void> recordCompletableFuture =
CompletableFuture.supplyAsync(()-> convertToSinkRecord(item))
+ .exceptionally((exception) -> {
+ logger.error("execute completable job
failed,stackTrace-", exception);
+ return null;
+ })
+ .thenAccept(connectRecords::add);
+ completableFutures.add(recordCompletableFuture);
+ });
+
+ CompletableFuture.allOf(completableFutures.toArray(new
CompletableFuture[messages.size()])).join();
+
return connectRecords;
}
@Override
public void commit(List<ConnectRecord> connectRecordList) {
-
+ // TODO
}
/**
@@ -127,17 +135,18 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
* @return
*/
public Set<String> parseTopicsByRunnerConfigs(Set<TargetRunnerConfig>
targetRunnerConfigs){
-
if(org.apache.commons.collections.CollectionUtils.isEmpty(targetRunnerConfigs)){
+ if(CollectionUtils.isEmpty(targetRunnerConfigs)){
logger.warn("target runner config is empty, parse to topic
failed!");
return null;
}
Set<String> listenTopics = Sets.newHashSet();
for(TargetRunnerConfig runnerConfig : targetRunnerConfigs){
List<Map<String,String>> runnerConfigMap =
runnerConfig.getComponents();
-
if(org.apache.commons.collections.CollectionUtils.isEmpty(runnerConfigMap)){
+ if(CollectionUtils.isEmpty(runnerConfigMap)){
+ logger.warn("target runner config components is empty, config
info - {}", runnerConfig);
continue;
}
-
listenTopics.addAll(runnerConfigMap.stream().map(item->item.get(RuntimerConfigDefine.CONNECT_TOPICNAME)).collect(Collectors.toSet()));
+
listenTopics.add(runnerConfigMap.iterator().next().get(RuntimerConfigDefine.CONNECT_TOPICNAME));
}
return listenTopics;
}
@@ -150,10 +159,10 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
Properties properties =
PropertiesLoaderUtils.loadAllProperties("runtimer.properties");
namesrvAddr = properties.getProperty("rocketmq.namesrvAddr");
pullTimeOut =
Integer.valueOf(properties.getProperty("rocketmq.consumer.pullTimeOut"));
+ pullBatchSize =
Integer.valueOf(properties.getProperty("rocketmq.consumer.pullBatchSize"));
}catch (Exception exception){
-
+ logger.error("init rocket mq property exception, stack trace-",
exception);
}
-
}
/**
@@ -173,6 +182,7 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer();
consumer.setConsumerGroup(createGroupName(SYS_DEFAULT_GROUP));
consumer.setNamesrvAddr(namesrvAddr);
+ consumer.setPullBatchSize(pullBatchSize);
try {
for(String topic : topics){
consumer.subscribe(topic, "*");
@@ -214,7 +224,7 @@ public class RocketMQEventSubscriber extends
EventSubscriber {
*/
private MessageQueue parseMessageQueueList(String messageQueueStr) {
List<String> messageQueueStrList =
Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr);
- if
(org.apache.commons.collections.CollectionUtils.isEmpty(messageQueueStrList) ||
messageQueueStrList.size() != 3) {
+ if (CollectionUtils.isEmpty(messageQueueStrList) ||
messageQueueStrList.size() != 3) {
return null;
}
return new MessageQueue(messageQueueStrList.get(0),
messageQueueStrList.get(1), Integer.valueOf(messageQueueStrList.get(2)));
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
index 9b59487..046d50d 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/transfer/TransformEngine.java
@@ -91,6 +91,11 @@ public class TransformEngine<R extends ConnectRecord>
implements AutoCloseable {
});
}
+ /**
+ * transform event record for target record
+ * @param connectRecord
+ * @return
+ */
public R doTransforms(R connectRecord) {
if (transformList.size() == 0) {
return connectRecord;
@@ -105,6 +110,15 @@ public class TransformEngine<R extends ConnectRecord>
implements AutoCloseable {
return connectRecord;
}
+ /**
+ * get task config value by key
+ * @param configKey
+ * @return
+ */
+ public String getConnectConfig(String configKey){
+ return config.getString(configKey);
+ }
+
private Transform getTransform(String transformClass) throws Exception {
ClassLoader loader = plugin.getPluginClassLoader(transformClass);
final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java
index 5d52128..956d48e 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetKeyValue.java
@@ -139,6 +139,11 @@ public class TargetKeyValue implements KeyValue,
Serializable {
this.properties = properties;
}
+ public KeyValue putAll(Map<String,String> configProps){
+ this.properties.putAll(configProps);
+ return this;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfig.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfig.java
deleted file mode 100644
index af4cafa..0000000
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfig.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.eventbridge.adapter.runtimer.config;
-
-import lombok.Data;
-import org.springframework.beans.factory.annotation.Configurable;
-import org.springframework.beans.factory.annotation.Value;
-
-@Data
-@Configurable
-public class RuntimerConfig {
-
- @Value("${rumtimer.name:}")
- private String runtimeName;
-
- @Value("${runtimer.pluginpath:}")
- private String pluginPath;
-
- @Value("${runtimer.storePathRootDir:}")
- private String storePathRootDir;
-
-}
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigProps.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigProps.java
new file mode 100644
index 0000000..4f7827c
--- /dev/null
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfigProps.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtimer.config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.support.PropertiesLoaderUtils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * runtimer properties factory
+ */
+public class RuntimerConfigProps {
+
+ private final static Logger logger =
LoggerFactory.getLogger(RuntimerConfigProps.class);
+
+ private Properties properties;
+
+ private RuntimerConfigProps(){
+ try {
+ properties =
PropertiesLoaderUtils.loadAllProperties("runtimer.properties");
+ } catch (IOException exception) {
+ logger.error("runtime load properties failed, stackTrace-",
exception);
+ }
+ }
+
+ private static class RuntimerConfigPropsHolder{
+ private static final RuntimerConfigProps instance = new
RuntimerConfigProps();
+ }
+
+ public static RuntimerConfigProps build(){
+ return RuntimerConfigPropsHolder.instance;
+ }
+
+
+}
diff --git
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
index 8764694..63f1e9d 100644
---
a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
+++
b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
@@ -50,7 +50,6 @@ public class TargetRunnerConfigOnFileObserver extends
AbstractTargetRunnerConfig
public static final String DEFAULT_TARGET_RUNNER_CONFIG_FILE_NAME =
"target-runner.json";
-
private static ScheduledExecutorService service =
Executors.newSingleThreadScheduledExecutor(
ThreadUtils.newThreadFactory("TargetRunnerConfigOnFileObserver",
false));
@@ -61,9 +60,7 @@ public class TargetRunnerConfigOnFileObserver extends
AbstractTargetRunnerConfig
}
public TargetRunnerConfigOnFileObserver() {
- if(StringUtils.isEmpty(pathName)){
- this.pathName = getConfigFilePath();
- }
+ this.pathName = getConfigFilePath();
super.getTargetRunnerConfig().addAll(getLatestTargetRunnerConfig());
this.addListen(pathName, this);
}
diff --git a/adapter/runtimer/src/main/resources/runtimer.properties
b/adapter/runtimer/src/main/resources/runtimer.properties
index b5659c1..7b290e5 100644
--- a/adapter/runtimer/src/main/resources/runtimer.properties
+++ b/adapter/runtimer/src/main/resources/runtimer.properties
@@ -16,8 +16,12 @@
## rocketmq
rocketmq.namesrvAddr=localhost:9876
rocketmq.consumer.pullTimeOut = 3000
+rocketmq.consumer.pullBatchSize=20
rocketmq.cluster.name=DefaultCluster
## runtimer
rumtimer.name=eventbridge-runtimer
runtimer.pluginpath=/Users/Local/eventbridge/plugin
-runtimer.storePathRootDir=/Users/Local/eventbridge/store
\ No newline at end of file
+runtimer.storePathRootDir=/Users/Local/eventbridge/store
+## listener
+listener.eventQueue.threshold=50000
+listener.targetQueue.threshold=50000
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/resources/target-runner.json
b/adapter/runtimer/src/main/resources/target-runner.json
index 11b5088..e94e506 100644
--- a/adapter/runtimer/src/main/resources/target-runner.json
+++ b/adapter/runtimer/src/main/resources/target-runner.json
@@ -1,23 +1,23 @@
[
- {
-
- "name":"xxxxx",
- "components":[ {
- "class" : "org.apache.rocketmq.connect.FileStream",
- "path" : "xxxxxxxx",
- "name" : "demo"
- },
- {
- "class" : "org.apache.rocketmq.connect.transforms.PatternRename",
- "pattern" : "company",
- "replacement": "company02"
- },
- {
- "class" : "org.apache.rocketmq.connect.HttpSinkTask",
- "url" : "http://xxxxx/demo"
- } ],
- "runOptions":{
- "taskSize":1
- }
- }
+ {
+ "name":"demo-runner",
+ "components":[
+ {
+ "runner-name": "demo-runner",
+ "connect-topicname":"eventbridge%654321%demo-bus_1678348282165"
+ },
+ {
+ "transforms":"filter,transform,",
+ "transforms-filter-filterPattern":"{}",
+
"transforms-filter-class":"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeFilterTransform",
+
"transforms-transform-data":"{\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\",\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\"}",
+ "transforms-transform-class":
"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform"
+ },
+ {
+
"task-class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkTask",
+
"webHook":"https://oapi.dingtalk.com/robot/send?access_token=7f78aa4734ea9bd245984e47b6764ccb950b4292e4f6f9424dff92909f485f16",
+
"secretKey":"SEC8a898c9df7b6415090a8f1341d9eed000c815a89f301f2de87302a1e802dbd69"
+ }
+ ]
+ }
]
\ No newline at end of file
diff --git a/supports/connect-cloudevent-transform/pom.xml
b/supports/connect-cloudevent-transform/pom.xml
index 9e47a7f..88caf65 100644
--- a/supports/connect-cloudevent-transform/pom.xml
+++ b/supports/connect-cloudevent-transform/pom.xml
@@ -21,7 +21,7 @@
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
-
<openmessaging-connector.version>0.1.4</openmessaging-connector.version>
+
<openmessaging-connector.version>0.1.2</openmessaging-connector.version>
<cloudevents.version>2.3.0</cloudevents.version>
</properties>
diff --git
a/supports/connect-cloudevent-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/CloudEventTransform.java
b/supports/connect-cloudevent-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/CloudEventTransform.java
index 0e43b2a..6569478 100644
---
a/supports/connect-cloudevent-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/CloudEventTransform.java
+++
b/supports/connect-cloudevent-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/CloudEventTransform.java
@@ -21,6 +21,7 @@ import com.google.gson.Gson;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.v1.CloudEventV1;
import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.ComponentContext;
import io.openmessaging.connector.api.data.ConnectRecord;
import org.apache.rocketmq.eventbridge.tools.transform.*;
@@ -167,7 +168,7 @@ public class CloudEventTransform implements
io.openmessaging.connector.api.compo
}
@Override
- public void start(KeyValue config) {
+ public void init(KeyValue config) {
this.idTransform = buildTransform(config, CloudEventV1.ID);
this.sourceTransform = buildTransform(config, CloudEventV1.SOURCE);
this.specversionTransform = buildTransform(config,
CloudEventV1.SPECVERSION);
@@ -177,6 +178,11 @@ public class CloudEventTransform implements
io.openmessaging.connector.api.compo
this.subjectTransform = buildTransform(config, CloudEventV1.SUBJECT);
}
+ @Override
+ public void start(ComponentContext componentContext) {
+
+ }
+
private Transform buildTransform(KeyValue config, String key) {
String transformConfig = config.getString(key);
if (transformConfig == null || Strings.isNullOrEmpty(transformConfig))
{
diff --git
a/supports/connect-cloudevent-transform/src/test/java/org/apache/rocketmq/connect/CloudEventTransformTest.java
b/supports/connect-cloudevent-transform/src/test/java/org/apache/rocketmq/connect/CloudEventTransformTest.java
index 261604e..a9ed3c4 100644
---
a/supports/connect-cloudevent-transform/src/test/java/org/apache/rocketmq/connect/CloudEventTransformTest.java
+++
b/supports/connect-cloudevent-transform/src/test/java/org/apache/rocketmq/connect/CloudEventTransformTest.java
@@ -37,7 +37,7 @@ public class CloudEventTransformTest {
keyValue.put("subject",
"{\"value\":\"$.data.subject\",\"form\":\"JSONPATH\"}");
keyValue.put("type", "{\"value\":\"$.type\",\"form\":\"JSONPATH\"}");
CloudEventTransform cloudEventTransform = new CloudEventTransform();
- cloudEventTransform.start(keyValue);
+ cloudEventTransform.init(keyValue);
ConnectRecord record = new ConnectRecord(null, null,
System.currentTimeMillis());
record.addExtension("type", "type");
diff --git a/supports/connect-eventbridge-transform/pom.xml
b/supports/connect-eventbridge-transform/pom.xml
index aa72f4d..02a5a8d 100644
--- a/supports/connect-eventbridge-transform/pom.xml
+++ b/supports/connect-eventbridge-transform/pom.xml
@@ -21,7 +21,7 @@
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
-
<openmessaging-connector.version>0.1.4</openmessaging-connector.version>
+
<openmessaging-connector.version>0.1.2</openmessaging-connector.version>
<gson.version>2.8.9</gson.version>
</properties>
diff --git
a/supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeTransform.java
b/supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeTransform.java
index bc98d4b..3b14021 100644
---
a/supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeTransform.java
+++
b/supports/connect-eventbridge-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeTransform.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.connect.transform.eventbridge;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.ComponentContext;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.SchemaBuilder;
import org.apache.rocketmq.eventbridge.tools.transform.*;
@@ -54,7 +55,7 @@ public class EventBridgeTransform implements
io.openmessaging.connector.api.comp
}
@Override
- public void start(KeyValue config) {
+ public void init(KeyValue config) {
config.keySet()
.forEach(key -> {
TransformParam transformParam = new
Gson().fromJson(config.getString(key), TransformParam.class);
@@ -62,6 +63,11 @@ public class EventBridgeTransform implements
io.openmessaging.connector.api.comp
});
}
+ @Override
+ public void start(ComponentContext componentContext) {
+
+ }
+
@Override
public void stop() {
diff --git
a/supports/connect-eventbridge-transform/src/test/java/org/apache/rocketmq/connect/EventBridgeTransformTest.java
b/supports/connect-eventbridge-transform/src/test/java/org/apache/rocketmq/connect/EventBridgeTransformTest.java
index 31c2e96..8395709 100644
---
a/supports/connect-eventbridge-transform/src/test/java/org/apache/rocketmq/connect/EventBridgeTransformTest.java
+++
b/supports/connect-eventbridge-transform/src/test/java/org/apache/rocketmq/connect/EventBridgeTransformTest.java
@@ -38,7 +38,7 @@ public class EventBridgeTransformTest {
"{\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\","
+
"\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\"}");
EventBridgeTransform eventBridgeTransform = new EventBridgeTransform();
- eventBridgeTransform.start(keyValue);
+ eventBridgeTransform.init(keyValue);
ConnectRecord record = new ConnectRecord(null, null,
System.currentTimeMillis());
record.addExtension("type", "type");
diff --git a/supports/connect-filter-transform/pom.xml
b/supports/connect-filter-transform/pom.xml
index 35e0769..c9328d2 100644
--- a/supports/connect-filter-transform/pom.xml
+++ b/supports/connect-filter-transform/pom.xml
@@ -21,7 +21,7 @@
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
-
<openmessaging-connector.version>0.1.4</openmessaging-connector.version>
+
<openmessaging-connector.version>0.1.2</openmessaging-connector.version>
<apache.commons-text.version>1.10.0</apache.commons-text.version>
<gson.version>2.8.9</gson.version>
<cloudevents.version>2.3.0</cloudevents.version>
diff --git
a/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java
b/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java
index b34ca98..9f704b3 100644
---
a/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java
+++
b/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java
@@ -21,6 +21,7 @@ import com.google.gson.Gson;
import com.google.gson.JsonElement;
import io.cloudevents.SpecVersion;
import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.ComponentContext;
import io.openmessaging.connector.api.data.ConnectRecord;
import org.apache.rocketmq.eventbridge.tools.pattern.PatternEvaluator;
import org.apache.rocketmq.eventbridge.tools.pattern.PatternEvaluatorBuilder;
@@ -70,7 +71,12 @@ public class EventBridgeFilterTransform implements
io.openmessaging.connector.ap
}
@Override
- public void start(KeyValue config) {
+ public void start(ComponentContext componentContext) {
+
+ }
+
+ @Override
+ public void init(KeyValue config) {
this.evaluator =
PatternEvaluatorBuilder.build(config.getString("filterPattern"));
}
diff --git
a/supports/connect-filter-transform/src/test/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransformTest.java
b/supports/connect-filter-transform/src/test/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransformTest.java
index 7698cf8..40cca1d 100644
---
a/supports/connect-filter-transform/src/test/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransformTest.java
+++
b/supports/connect-filter-transform/src/test/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransformTest.java
@@ -30,7 +30,7 @@ public class EventBridgeFilterTransformTest {
EventBridgeFilterTransform transform = new
EventBridgeFilterTransform();
KeyValue keyValue = new DefaultKeyValue();
keyValue.put("filterPattern", "{\"source\":[\"acs.mns\"]}");
- transform.start(keyValue);
+ transform.init(keyValue);
ConnectRecord record = new ConnectRecord(null, null,
System.currentTimeMillis());
record.addExtension("source", "acs.demo");