This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch shenlin/runtimer in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit 2a24698066e51ca6bdc74625a09228339d162ea1 Author: 2011shenlin <[email protected]> AuthorDate: Wed Mar 15 17:51:40 2023 +0800 feat:add TargetRunnerConfigObserver --- .../RocketMQConnectTargetRunnerAPIImpl.java | 40 +-- .../eventbridge/adapter/runtimer/Runtimer.java | 24 +- .../adapter/runtimer/boot/EventBusListener.java | 80 +++--- .../adapter/runtimer/boot/EventRuleTransfer.java | 82 ++++--- .../adapter/runtimer/boot/EventTargetPusher.java | 73 +++--- .../runtimer/boot/listener/ListenerFactory.java | 6 +- .../boot/listener/TargetRunnerListener.java | 44 ++++ ...erTargetEntity.java => TargetRunnerConfig.java} | 4 +- .../AbstractTargetRunnerConfigObserver.java | 95 ++++++++ .../service/PusherConfigManageServiceImpl.java | 271 --------------------- ...ervice.java => TargetRunnerConfigObserver.java} | 69 +----- .../TargetRunnerConfigOnControllerObserver.java | 48 ++++ .../service/TargetRunnerConfigOnDBObserver.java | 60 +++++ .../service/TargetRunnerConfigOnFileObserver.java | 92 +++++++ 14 files changed, 517 insertions(+), 471 deletions(-) diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java index fda982c..8971028 100644 --- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java +++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/connect/RocketMQConnectTargetRunnerAPIImpl.java @@ -20,11 +20,8 @@ package org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; import com.google.gson.Gson; - import java.util.ArrayList; import java.util.Map; -import java.util.Objects; - import lombok.SneakyThrows; import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context.RocketMQConnectSourceRunnerContext; import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.context.RocketMQConnectTargetRunnerContext; @@ -32,7 +29,6 @@ import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.ActionStatus import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.CreateSinkConnectorRequest; import org.apache.rocketmq.eventbridge.adapter.rpc.impl.connect.dto.TransformRequest; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue; -import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService; import org.apache.rocketmq.eventbridge.domain.common.enums.EventTargetStatusEnum; import org.apache.rocketmq.eventbridge.domain.model.Component; import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions; @@ -45,13 +41,10 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem private final RocketMQConnectClient rocketMQConnectClient; - private PusherConfigManageService pusherConfigManageService; - public RocketMQConnectTargetRunnerAPIImpl(EventDataRepository eventDataRepository, - RocketMQConnectClient rocketMQConnectClient, PusherConfigManageService pusherConfigManageService) { + RocketMQConnectClient rocketMQConnectClient) { super(eventDataRepository); this.rocketMQConnectClient = rocketMQConnectClient; - this.pusherConfigManageService = pusherConfigManageService; } @SneakyThrows @@ -64,14 +57,9 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem TransformRequest filterTransform = this.buildEventBridgeFilterTransform(filterPattern); TransformRequest eventBridgeTransform = this.buildEventBridgeTransform(targetTransform); TargetKeyValue targetKeyValue = initSinkTaskConfig(name, topicName, sinkConnectorClass, - sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform)); - if(Objects.nonNull(pusherConfigManageService)){ - pusherConfigManageService.putConnectTargetConfig(name, targetKeyValue); - }else { - // todo delete - rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass, - sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform)); - } + sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform)); + rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass, + sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform)); RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(name, JSON.toJSONString(targetKeyValue)); return new Gson().toJson(context); } @@ -87,17 +75,11 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem TransformRequest eventBridgeTransform = this.buildEventBridgeTransform(targetTransform); //create TargetKeyValue targetKeyValue = initSinkTaskConfig(name, topicName, sinkConnectorClass, - sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform)); - if(Objects.nonNull(pusherConfigManageService)){ - pusherConfigManageService.putConnectTargetConfig(name, targetKeyValue); - }else { - // todo delete - //stop - this.delete(runContext); - - rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass, - sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform)); - } + sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform)); + //stop + this.delete(runContext); + rocketMQConnectClient.createSinkConnector(name, topicName, sinkConnectorClass, + sinkConnectorConfig, Lists.newArrayList(filterTransform, eventBridgeTransform)); RocketMQConnectTargetRunnerContext context = new RocketMQConnectTargetRunnerContext(name, JSON.toJSONString(targetKeyValue)); return new Gson().toJson(context); } @@ -133,6 +115,7 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem /** * init sink task config + * * @param name * @param topicName * @param sinkClass @@ -140,7 +123,8 @@ public class RocketMQConnectTargetRunnerAPIImpl extends RocketMQConverter implem * @param transforms * @return */ - private TargetKeyValue initSinkTaskConfig(String name, String topicName, String sinkClass, Map<String, Object> sinkConfig, ArrayList<TransformRequest> transforms) { + private TargetKeyValue initSinkTaskConfig(String name, String topicName, String sinkClass, + Map<String, Object> sinkConfig, ArrayList<TransformRequest> transforms) { CreateSinkConnectorRequest request = new CreateSinkConnectorRequest(); request.setName(name); request.setTopicName(topicName); diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java index 896885d..44745d2 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java @@ -25,7 +25,7 @@ import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin; -import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService; +import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -55,7 +55,7 @@ public class Runtimer extends ServiceThread{ private ListenerFactory listenerFactory; - private PusherConfigManageService pusherConfigManageService; + private TargetRunnerConfigObserver targetRunnerConfigObserver; private Map<String, List<TargetKeyValue>> taskConfigs = new HashMap<>(); @@ -67,21 +67,21 @@ public class Runtimer extends ServiceThread{ private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor((Runnable r) -> new Thread(r, "RuntimerScheduledThread")); - public Runtimer(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService configManageService) { + public Runtimer(Plugin plugin, ListenerFactory listenerFactory, TargetRunnerConfigObserver configManageService) { this.plugin = plugin; this.listenerFactory = listenerFactory; - this.pusherConfigManageService = configManageService; + this.targetRunnerConfigObserver = configManageService; } @PostConstruct public void initAndStart() { logger.info("init runtimer task config"); - this.taskConfigs = pusherConfigManageService.getTaskConfigs(); - listener = new EventBusListener(listenerFactory, pusherConfigManageService); + this.taskConfigs = targetRunnerConfigObserver.getTaskConfigs(); + listener = new EventBusListener(listenerFactory, targetRunnerConfigObserver); listener.initOrUpdateListenConsumer(taskConfigs); - transfer = new EventRuleTransfer(plugin, listenerFactory, pusherConfigManageService); + transfer = new EventRuleTransfer(plugin, listenerFactory, targetRunnerConfigObserver); transfer.initOrUpdateTaskTransform(taskConfigs); - pusher = new EventTargetPusher(plugin, listenerFactory, pusherConfigManageService); + pusher = new EventTargetPusher(plugin, listenerFactory, targetRunnerConfigObserver); pusher.initOrUpdatePusherTask(taskConfigs); startRuntimer(); } @@ -105,13 +105,5 @@ public class Runtimer extends ServiceThread{ pusher.start(); - scheduledExecutorService.scheduleAtFixedRate(() -> { - try { - this.pusherConfigManageService.persist(); - } catch (Exception e) { - logger.error("schedule persist config error.", e); - } - }, 500, 500, TimeUnit.MILLISECONDS); - } } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java index 0c40235..8f3c707 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java @@ -25,31 +25,43 @@ import io.openmessaging.connector.api.data.RecordOffset; import io.openmessaging.connector.api.data.RecordPartition; import io.openmessaging.connector.api.data.Schema; import io.openmessaging.internal.DefaultKeyValue; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue; +import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.QueueState; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine; -import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService; +import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; -import java.nio.charset.StandardCharsets; -import java.util.*; -import java.util.concurrent.*; - /** * listen the event and offer to queue + * * @author artisan */ -public class EventBusListener extends ServiceThread { +public class EventBusListener extends ServiceThread implements TargetRunnerListener { private Logger logger = LoggerFactory.getLogger(EventBusListener.class); @@ -63,32 +75,33 @@ public class EventBusListener extends ServiceThread { private ListenerFactory listenerFactory; - private PusherConfigManageService pusherConfigManageService; + private TargetRunnerConfigObserver targetRunnerConfigObserver; - private ExecutorService executorService = new ThreadPoolExecutor(20,60, 1000,TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100)); + private ExecutorService executorService = new ThreadPoolExecutor(20, 60, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100)); private BlockingQueue<MessageExt> eventMessage = new LinkedBlockingQueue(50000); - public EventBusListener(ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){ + public EventBusListener(ListenerFactory listenerFactory, TargetRunnerConfigObserver targetRunnerConfigObserver) { this.messageQueuesOffsetMap = new ConcurrentHashMap<>(256); this.messageQueuesStateMap = new ConcurrentHashMap<>(256); this.listenerFactory = listenerFactory; - this.pusherConfigManageService = pusherConfigManageService; - this.pusherConfigManageService.registerListener(new ConsumerUpdateListenerImpl()); + this.targetRunnerConfigObserver = targetRunnerConfigObserver; + this.targetRunnerConfigObserver.registerListener(this); } /** * init listen consumer + * * @param taskConfig */ - public void initOrUpdateListenConsumer(Map<String, List<TargetKeyValue>> taskConfig){ - if(MapUtils.isEmpty(taskConfig)){ + public void initOrUpdateListenConsumer(Map<String, List<TargetKeyValue>> taskConfig) { + if (MapUtils.isEmpty(taskConfig)) { logger.warn("initListenConsumer by taskConfig param is empty"); return; } List<TargetKeyValue> targetKeyValues = initTaskKeyInfo(taskConfig); this.topics.addAll(listenerFactory.parseTopicListByList(targetKeyValues)); - for (String topic : topics){ + for (String topic : topics) { DefaultLitePullConsumer pullConsumer = listenerFactory.initDefaultMQPullConsumer(topic); listenConsumer.add(pullConsumer); } @@ -97,12 +110,13 @@ public class EventBusListener extends ServiceThread { /** * init all task config info + * * @param taskConfig * @return */ private List<TargetKeyValue> initTaskKeyInfo(Map<String, List<TargetKeyValue>> taskConfig) { Set<TargetKeyValue> targetKeyValues = new HashSet<>(); - for(String connectName : taskConfig.keySet()){ + for (String connectName : taskConfig.keySet()) { targetKeyValues.addAll(taskConfig.get(connectName)); } return Lists.newArrayList(targetKeyValues); @@ -110,13 +124,13 @@ public class EventBusListener extends ServiceThread { @Override public void run() { - while (!stopped){ - if(CollectionUtils.isEmpty(listenConsumer)){ + while (!stopped) { + if (CollectionUtils.isEmpty(listenConsumer)) { logger.info("current listen consumer is empty"); this.waitForRunning(1000); continue; } - for(DefaultLitePullConsumer pullConsumer : listenConsumer) { + for (DefaultLitePullConsumer pullConsumer : listenConsumer) { executorService.submit(() -> { try { List<MessageExt> messageExts = pullConsumer.poll(3000); @@ -144,6 +158,7 @@ public class EventBusListener extends ServiceThread { /** * MessageExt convert to connect record + * * @param messageExt * @return */ @@ -175,14 +190,21 @@ public class EventBusListener extends ServiceThread { /** * consumer update listener */ - class ConsumerUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener { - - @Override - public void onConfigUpdate(PusherTargetEntity targetEntity) { - logger.info("consumer update by new target config changed, target info -{}", JSON.toJSONString(targetEntity)); - Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>(); - lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues()); - initOrUpdateListenConsumer(lastTargetMap); - } + @Override + public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) { + logger.info("consumer update by new target config changed, target info -{}", JSON.toJSONString(targetRunnerConfig)); + Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>(); + lastTargetMap.put(targetRunnerConfig.getConnectName(), targetRunnerConfig.getTargetKeyValues()); + initOrUpdateListenConsumer(lastTargetMap); } + + @Override + public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) { + } + + @Override + public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) { + + } + } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java index f14fdec..be73b2a 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java @@ -19,44 +19,54 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.boot; import com.alibaba.fastjson.JSON; import io.openmessaging.connector.api.data.ConnectRecord; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory; +import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.transfer.TransformEngine; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin; -import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService; +import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.concurrent.*; - /** * receive event and transfer the rule to pusher */ -public class EventRuleTransfer extends ServiceThread { +public class EventRuleTransfer extends ServiceThread implements TargetRunnerListener { private static final Logger logger = LoggerFactory.getLogger(EventRuleTransfer.class); private ListenerFactory listenerFactory; - private PusherConfigManageService pusherConfigManageService; + private TargetRunnerConfigObserver targetRunnerConfigObserver; private Plugin plugin; Map<TargetKeyValue/*taskConfig*/, TransformEngine<ConnectRecord>/*taskTransform*/> taskTransformMap = new ConcurrentHashMap<>(20); - private ExecutorService executorService = new ThreadPoolExecutor(20,60, 1000,TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100)); + private ExecutorService executorService = new ThreadPoolExecutor(20, 60, 1000, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>(100)); - public EventRuleTransfer(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){ + public EventRuleTransfer(Plugin plugin, ListenerFactory listenerFactory, + TargetRunnerConfigObserver targetRunnerConfigObserver) { this.plugin = plugin; this.listenerFactory = listenerFactory; - this.pusherConfigManageService = pusherConfigManageService; - this.pusherConfigManageService.registerListener(new TransformUpdateListenerImpl()); + this.targetRunnerConfigObserver = targetRunnerConfigObserver; + this.targetRunnerConfigObserver.registerListener(this); } - public void initOrUpdateTaskTransform(Map<String, List<TargetKeyValue>> taskConfig){ + public void initOrUpdateTaskTransform(Map<String, List<TargetKeyValue>> taskConfig) { this.taskTransformMap.putAll(initSinkTaskTransformInfo(taskConfig)); } @@ -67,9 +77,9 @@ public class EventRuleTransfer extends ServiceThread { @Override public void run() { - while (!stopped){ + while (!stopped) { ConnectRecord eventRecord = listenerFactory.takeEventRecord(); - if(Objects.isNull(eventRecord)){ + if (Objects.isNull(eventRecord)) { logger.info("listen eventRecord is empty, continue by curTime - {}", System.currentTimeMillis()); this.waitForRunning(1000); continue; @@ -77,16 +87,16 @@ public class EventRuleTransfer extends ServiceThread { executorService.submit(() -> { // extension add sub // rule - target - for (TargetKeyValue targetKeyValue : taskTransformMap.keySet()){ + for (TargetKeyValue targetKeyValue : taskTransformMap.keySet()) { // add threadPool for cup task // attention coreSize TransformEngine<ConnectRecord> transformEngine = taskTransformMap.get(targetKeyValue); ConnectRecord transformRecord = transformEngine.doTransforms(eventRecord); - if(Objects.isNull(transformRecord)){ + if (Objects.isNull(transformRecord)) { continue; } // a bean for maintain - Map<TargetKeyValue,ConnectRecord> targetMap = new HashMap<>(); + Map<TargetKeyValue, ConnectRecord> targetMap = new HashMap<>(); targetMap.put(targetKeyValue, transformRecord); listenerFactory.offerTargetTaskQueue(targetMap); @@ -101,20 +111,20 @@ public class EventRuleTransfer extends ServiceThread { } /** - * Init sink task transform map - * key: task config - * value: transformEngine + * Init sink task transform map key: task config value: transformEngine + * * @param taskConfig * @return */ - private Map<TargetKeyValue, TransformEngine<ConnectRecord>> initSinkTaskTransformInfo(Map<String, List<TargetKeyValue>> taskConfig) { + private Map<TargetKeyValue, TransformEngine<ConnectRecord>> initSinkTaskTransformInfo( + Map<String, List<TargetKeyValue>> taskConfig) { Map<TargetKeyValue, TransformEngine<ConnectRecord>> curTaskTransformMap = new HashMap<>(); Set<TargetKeyValue> allTaskKeySet = new HashSet<>(); - for(String connectName : taskConfig.keySet()){ + for (String connectName : taskConfig.keySet()) { List<TargetKeyValue> taskKeyList = taskConfig.get(connectName); allTaskKeySet.addAll(new HashSet<>(taskKeyList)); } - for(TargetKeyValue keyValue : allTaskKeySet){ + for (TargetKeyValue keyValue : allTaskKeySet) { TransformEngine<ConnectRecord> transformChain = new TransformEngine<>(keyValue, plugin); curTaskTransformMap.put(keyValue, transformChain); } @@ -125,14 +135,20 @@ public class EventRuleTransfer extends ServiceThread { /** * transform update listener */ - class TransformUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener { - - @Override - public void onConfigUpdate(PusherTargetEntity targetEntity) { - logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetEntity)); - Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>(); - lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues()); - initOrUpdateTaskTransform(lastTargetMap); - } + @Override + public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) { + logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetRunnerConfig)); + Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>(); + lastTargetMap.put(targetRunnerConfig.getConnectName(), targetRunnerConfig.getTargetKeyValues()); + initOrUpdateTaskTransform(lastTargetMap); + } + + @Override + public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) { + } + + @Override + public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) { + } } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java index d324fe4..bd9ddb0 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java @@ -22,27 +22,32 @@ import com.google.common.collect.Lists; import io.netty.util.internal.ConcurrentSet; import io.openmessaging.connector.api.component.task.sink.SinkTask; import io.openmessaging.connector.api.data.ConnectRecord; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.collections.MapUtils; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.ListenerFactory; +import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.pusher.PusherTaskContext; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.PluginClassLoader; import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine; -import org.apache.rocketmq.eventbridge.adapter.runtimer.service.PusherConfigManageService; +import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; - /** * event target push to sink task + * * @author artisan */ -public class EventTargetPusher extends ServiceThread { +public class EventTargetPusher extends ServiceThread implements TargetRunnerListener { private static final Logger logger = LoggerFactory.getLogger(EventTargetPusher.class); @@ -56,29 +61,31 @@ public class EventTargetPusher extends ServiceThread { private ListenerFactory listenerFactory; - private PusherConfigManageService pusherConfigManageService; + private TargetRunnerConfigObserver targetRunnerConfigObserver; private List<SinkTask> pusherTasks = new CopyOnWriteArrayList<>(); - public EventTargetPusher(Plugin plugin, ListenerFactory listenerFactory, PusherConfigManageService pusherConfigManageService){ + public EventTargetPusher(Plugin plugin, ListenerFactory listenerFactory, + TargetRunnerConfigObserver targetRunnerConfigObserver) { this.plugin = plugin; this.listenerFactory = listenerFactory; - this.pusherConfigManageService = pusherConfigManageService; - this.pusherConfigManageService.registerListener(new TargetUpdateListenerImpl()); + this.targetRunnerConfigObserver = targetRunnerConfigObserver; + this.targetRunnerConfigObserver.registerListener(this); } /** * init running tasks + * * @param taskConfig */ - public void initOrUpdatePusherTask(Map<String, List<TargetKeyValue>> taskConfig){ + public void initOrUpdatePusherTask(Map<String, List<TargetKeyValue>> taskConfig) { Set<TargetKeyValue> taskProperty = new HashSet<>(); - for(String connectName : taskConfig.keySet()){ + for (String connectName : taskConfig.keySet()) { List<TargetKeyValue> targetKeyValues = taskConfig.get(connectName); taskProperty.addAll(new HashSet<>(targetKeyValues)); } - for(TargetKeyValue targetKeyValue : taskProperty){ - try{ + for (TargetKeyValue targetKeyValue : taskProperty) { + try { String taskClass = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS); ClassLoader loader = plugin.getPluginClassLoader(taskClass); Class taskClazz; @@ -98,7 +105,7 @@ public class EventTargetPusher extends ServiceThread { Plugin.compareAndSwapLoaders(loader); } logger.info("init target task succeed, target key - {}", JSON.toJSONString(targetKeyValue)); - }catch (Exception exception){ + } catch (Exception exception) { exception.printStackTrace(); } } @@ -106,9 +113,9 @@ public class EventTargetPusher extends ServiceThread { @Override public void run() { - while (!stopped){ + while (!stopped) { Map<TargetKeyValue, ConnectRecord> taskPusher = listenerFactory.takeTargetMap(); - if(MapUtils.isEmpty(taskPusher)){ + if (MapUtils.isEmpty(taskPusher)) { logger.info("current target pusher is empty"); this.waitForRunning(1000); continue; @@ -121,8 +128,8 @@ public class EventTargetPusher extends ServiceThread { // also add in ConnectRecord class system property String taskPushName = targetKeyValue.getString(RuntimerConfigDefine.TASK_CLASS); // add thread pool - for(SinkTask sinkTask : pusherTasks){ - if(sinkTask.getClass().getName().equals(taskPushName)){ + for (SinkTask sinkTask : pusherTasks) { + if (sinkTask.getClass().getName().equals(taskPushName)) { sinkTask.put(Lists.newArrayList(taskPusher.get(targetKeyValue))); } } @@ -137,14 +144,22 @@ public class EventTargetPusher extends ServiceThread { /** * target update listener */ - class TargetUpdateListenerImpl implements PusherConfigManageService.TargetConfigUpdateListener { - - @Override - public void onConfigUpdate(PusherTargetEntity targetEntity) { - logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetEntity)); - Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>(); - lastTargetMap.put(targetEntity.getConnectName(), targetEntity.getTargetKeyValues()); - initOrUpdatePusherTask(lastTargetMap); - } + + @Override + public void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) { + logger.info("transform update by new target config changed, target info -{}", JSON.toJSONString(targetRunnerConfig)); + Map<String, List<TargetKeyValue>> lastTargetMap = new HashMap<>(); + lastTargetMap.put(targetRunnerConfig.getConnectName(), targetRunnerConfig.getTargetKeyValues()); + initOrUpdatePusherTask(lastTargetMap); } + + @Override + public void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) { + } + + @Override + public void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) { + + } + } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java index 07f010e..44c29bb 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/ListenerFactory.java @@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.LoggerName; import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine; @@ -52,7 +52,7 @@ public class ListenerFactory { public static final String QUEUE_OFFSET = "queueOffset"; - private BlockingQueue<PusherTargetEntity> pusherTargetQueue = new LinkedBlockingQueue<>(1000); + private BlockingQueue<TargetRunnerConfig> pusherTargetQueue = new LinkedBlockingQueue<>(1000); private BlockingQueue<MessageExt> eventMessage = new LinkedBlockingQueue(50000); @@ -76,7 +76,7 @@ public class ListenerFactory { return consumer; } - public PusherTargetEntity takeTaskConfig(){ + public TargetRunnerConfig takeTaskConfig(){ if(pusherTargetQueue.isEmpty()){ return null; } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java new file mode 100644 index 0000000..f528b7b --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/TargetRunnerListener.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener; + +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; + +public interface TargetRunnerListener { + + /** + * Call when add new target runner to runtimer. + * + * @param targetRunnerConfig + */ + void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig); + + /** + * Call when the old target runner updated. + * + * @param targetRunnerConfig + */ + void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig); + + /** + * Call when the old target runner deleted from runtimer. + * + * @param targetRunnerConfig + */ + void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig); + +} \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java similarity index 92% rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java rename to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java index dc7ced4..52525cf 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/PusherTargetEntity.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java @@ -26,7 +26,7 @@ import java.util.List; * pusher target key config */ @Data -public class PusherTargetEntity implements Serializable { +public class TargetRunnerConfig implements Serializable { private String connectName; @@ -35,7 +35,7 @@ public class PusherTargetEntity implements Serializable { @Override public boolean equals(Object object){ if (object != null && object.getClass() == this.getClass()) { - PusherTargetEntity targetEntity = (PusherTargetEntity) object; + TargetRunnerConfig targetEntity = (TargetRunnerConfig) object; return this.connectName.equals(targetEntity.getConnectName()) && this.targetKeyValues.size() == targetEntity.getTargetKeyValues().size() && this.targetKeyValues.containsAll(targetEntity.getTargetKeyValues()); diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java new file mode 100644 index 0000000..619bc63 --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.runtimer.service; + +import com.google.common.collect.Sets; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +@Service +public abstract class AbstractTargetRunnerConfigObserver implements TargetRunnerConfigObserver { + + private Set<TargetRunnerConfig> targetRunnerConfigs = Sets.newHashSet(); + + /** + * All listeners to trigger while config change. + */ + private Set<TargetRunnerListener> targetRunnerConfigListeners = new HashSet<>(); + + public Set<TargetRunnerConfig> getTargetRunnerConfig() { + return targetRunnerConfigs; + } + + public abstract Set<TargetRunnerConfig> getLatestTargetRunnerConfig(); + + @Override + public void registerListener(TargetRunnerListener listener) { + this.targetRunnerConfigListeners.add(listener); + } + + void onAddTargetRunner(TargetRunnerConfig targetRunnerConfig) { + this.targetRunnerConfigs.add(targetRunnerConfig); + if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) { + return; + } + for (TargetRunnerListener listener : this.targetRunnerConfigListeners) { + listener.onAddTargetRunner(targetRunnerConfig); + } + } + + /** + * Call when the old target runner updated. + * + * @param targetRunnerConfig + */ + void onUpdateTargetRunner(TargetRunnerConfig targetRunnerConfig) { + this.targetRunnerConfigs.add(targetRunnerConfig); + if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) { + return; + } + for (TargetRunnerListener listener : this.targetRunnerConfigListeners) { + listener.onUpdateTargetRunner(targetRunnerConfig); + } + } + + /** + * Call when the old target runner deleted from runtimer. + * + * @param targetRunnerConfig + */ + void onDeleteTargetRunner(TargetRunnerConfig targetRunnerConfig) { + this.targetRunnerConfigs.remove(targetRunnerConfig); + if (CollectionUtils.isEmpty(this.targetRunnerConfigListeners)) { + return; + } + for (TargetRunnerListener listener : this.targetRunnerConfigListeners) { + listener.onDeleteTargetRunner(targetRunnerConfig); + } + } + + public Map<String, List<TargetKeyValue>> getTaskConfigs() { + return null; + } +} diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java deleted file mode 100644 index 58a66fd..0000000 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageServiceImpl.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.eventbridge.adapter.runtimer.service; - -import com.google.common.collect.Lists; -import io.openmessaging.KeyValue; -import io.openmessaging.connector.api.component.connector.Connector; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.FilePathConfigUtil; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.FileBaseKeyValueStore; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.store.KeyValueStore; -import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine; -import org.apache.rocketmq.eventbridge.adapter.runtimer.converter.JsonConverter; -import org.apache.rocketmq.eventbridge.adapter.runtimer.converter.ListConverter; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Service; -import org.springframework.util.CollectionUtils; - -import javax.annotation.PostConstruct; -import java.util.*; -import java.util.concurrent.CopyOnWriteArraySet; - -@Service -public class PusherConfigManageServiceImpl implements PusherConfigManageService { - - /** - * plugin for recognize class loader - */ - private Plugin plugin; - - /** - * Current connector configs in the store. - */ - private KeyValueStore<String, TargetKeyValue> connectorKeyValueStore; - - /** - * Current task configs in the store. - */ - private KeyValueStore<String, List<TargetKeyValue>> taskKeyValueStore; - - /** - * All listeners to trigger while config change. - */ - private Set<TargetConfigUpdateListener> targetConfigUpdateListeners; - - private Set<String> connectTopicNames; - - @Value("${runtimer.storePathRootDir:}") - private String storeRootPath; - - public PusherConfigManageServiceImpl(Plugin plugin){ - this.plugin = plugin; - this.connectTopicNames = new CopyOnWriteArraySet<>(); - this.targetConfigUpdateListeners = new HashSet<>(); - } - - @PostConstruct - public void initStoreKeyValue(){ - this.connectorKeyValueStore = new FileBaseKeyValueStore<>( - FilePathConfigUtil.getConnectorConfigPath(this.storeRootPath), - new JsonConverter(), - new JsonConverter(TargetKeyValue.class)); - this.taskKeyValueStore = new FileBaseKeyValueStore<>( - FilePathConfigUtil.getTaskConfigPath(this.storeRootPath), - new JsonConverter(), - new ListConverter(TargetKeyValue.class)); - this.connectorKeyValueStore.load(); - this.taskKeyValueStore.load(); - } - - /** - * get all connector configs enabled - * - * @return - */ - @Override - public Map<String, TargetKeyValue> getConnectorConfigs() { - Map<String, TargetKeyValue> result = new HashMap<>(); - Map<String, TargetKeyValue> connectorConfigs = connectorKeyValueStore.getKVMap(); - for (String connectorName : connectorConfigs.keySet()) { - TargetKeyValue config = connectorConfigs.get(connectorName); - if (0 != config.getInt(RuntimerConfigDefine.CONFIG_DELETED)) { - continue; - } - result.put(connectorName, config); - } - return result; - } - - @Override - public String putConnectTargetConfig(String connectorName, TargetKeyValue configs) throws Exception { - TargetKeyValue exist = connectorKeyValueStore.get(connectorName); - if (null != exist) { - Long updateTimestamp = exist.getLong(RuntimerConfigDefine.UPDATE_TIMESTAMP); - if (null != updateTimestamp) { - configs.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, updateTimestamp); - } - } - if (configs.equals(exist)) { - return "Connector with same config already exist."; - } - - Long currentTimestamp = System.currentTimeMillis(); - configs.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, currentTimestamp); - for (String requireConfig : RuntimerConfigDefine.REQUEST_CONFIG) { - if (!configs.containsKey(requireConfig)) { - return "Request config key: " + requireConfig; - } - } - - String connectorClass = configs.getString(RuntimerConfigDefine.CONNECTOR_CLASS); - ClassLoader classLoader = plugin.getPluginClassLoader(connectorClass); - Class clazz; - if (null != classLoader) { - clazz = Class.forName(connectorClass, true, classLoader); - } else { - clazz = Class.forName(connectorClass); - } - final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance(); - connector.validate(configs); - connector.init(configs); - connectorKeyValueStore.put(connectorName, configs); - recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs); - return ""; - } - - @Override - public void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, TargetKeyValue configs) { - int maxTask = configs.getInt(RuntimerConfigDefine.MAX_TASK, 1); - List<KeyValue> taskConfigs = connector.taskConfigs(maxTask); - List<TargetKeyValue> converterdConfigs = new ArrayList<>(); - for (KeyValue keyValue : taskConfigs) { - TargetKeyValue newKeyValue = new TargetKeyValue(); - for (String key : keyValue.keySet()) { - newKeyValue.put(key, keyValue.getString(key)); - } - newKeyValue.put(RuntimerConfigDefine.TASK_CLASS, connector.taskClass().getName()); - newKeyValue.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, currentTimestamp); - - newKeyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAME, configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAME)); - newKeyValue.put(RuntimerConfigDefine.CONNECT_TOPICNAMES, configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAMES)); - Set<String> connectConfigKeySet = configs.keySet(); - for (String connectConfigKey : connectConfigKeySet) { - if (connectConfigKey.startsWith(RuntimerConfigDefine.TRANSFORMS)) { - newKeyValue.put(connectConfigKey, configs.getString(connectConfigKey)); - } - } - converterdConfigs.add(newKeyValue); - connectTopicNames.add(configs.getString(RuntimerConfigDefine.CONNECT_TOPICNAME)); - } - putTaskConfigs(connectorName, converterdConfigs); - } - - @Override - public void removeConnectorConfig(String connectorName) { - TargetKeyValue config = connectorKeyValueStore.get(connectorName); - if(Objects.isNull(config)){ - return; - } - config.put(RuntimerConfigDefine.UPDATE_TIMESTAMP, System.currentTimeMillis()); - config.put(RuntimerConfigDefine.CONFIG_DELETED, 1); - List<TargetKeyValue> taskConfigList = taskKeyValueStore.get(connectorName); - taskConfigList.add(config); - connectorKeyValueStore.put(connectorName, config); - putTaskConfigs(connectorName, taskConfigList); - } - - @Override - public Map<String, List<TargetKeyValue>> getTaskConfigs() { - Map<String, List<TargetKeyValue>> result = new HashMap<>(); - Map<String, List<TargetKeyValue>> taskConfigs = taskKeyValueStore.getKVMap(); - Map<String, TargetKeyValue> filteredConnector = getConnectorConfigs(); - for (String connectorName : taskConfigs.keySet()) { - if (!filteredConnector.containsKey(connectorName)) { - continue; - } - result.put(connectorName, taskConfigs.get(connectorName)); - } - return result; - } - - @Override - public Set<PusherTargetEntity> getTargetInfo() { - Set<PusherTargetEntity> result = new HashSet<>(); - Map<String, List<TargetKeyValue>> taskConfigs = taskKeyValueStore.getKVMap(); - Map<String, TargetKeyValue> filteredConnector = getConnectorConfigs(); - for (String connectorName : taskConfigs.keySet()) { - if (!filteredConnector.containsKey(connectorName)) { - continue; - } - PusherTargetEntity targetEntity = new PusherTargetEntity(); - targetEntity.setConnectName(connectorName); - targetEntity.setTargetKeyValues(taskConfigs.get(connectorName)); - result.add(targetEntity); - } - return result; - } - - @Override - public List<String> getConnectTopics(){ - if(CollectionUtils.isEmpty(connectTopicNames)){ - return Lists.newArrayList(); - } - return Lists.newArrayList(connectTopicNames); - } - - @Override - public void persist() { - this.connectorKeyValueStore.persist(); - this.taskKeyValueStore.persist(); - } - - @Override - public void registerListener(TargetConfigUpdateListener listener) { - this.targetConfigUpdateListeners.add(listener); - } - - /** - * put target task key config for update - * @param connectorName - * @param configs - */ - private void putTaskConfigs(String connectorName, List<TargetKeyValue> configs) { - List<TargetKeyValue> exist = taskKeyValueStore.get(connectorName); - if (null != exist && exist.size() > 0) { - taskKeyValueStore.remove(connectorName); - } - taskKeyValueStore.put(connectorName, configs); - PusherTargetEntity targetEntity = new PusherTargetEntity(); - targetEntity.setConnectName(connectorName); - targetEntity.setTargetKeyValues(configs); - triggerListener(targetEntity); - persistStore(); - } - - private void persistStore() { - - } - - /** - * trigger new target task config for update - * @param pusherTargetEntity - */ - private void triggerListener(PusherTargetEntity pusherTargetEntity) { - if (CollectionUtils.isEmpty(this.targetConfigUpdateListeners)) { - return; - } - - for (TargetConfigUpdateListener listener : this.targetConfigUpdateListeners) { - listener.onConfigUpdate(pusherTargetEntity); - } - } - -} diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java similarity index 50% rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java rename to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java index 88a2001..c6c7670 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/PusherConfigManageService.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigObserver.java @@ -17,82 +17,31 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.service; -import io.openmessaging.connector.api.component.connector.Connector; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.PusherTargetEntity; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue; - import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; /** * manage the pusher connector/task config info */ -public interface PusherConfigManageService { - - /** - * Get all connector configs - * - * @return - */ - Map<String, TargetKeyValue> getConnectorConfigs(); - - /** - * Put the configs. - * - * @param connectorName - * @param configs - * @return - * @throws Exception - */ - String putConnectTargetConfig(String connectorName, TargetKeyValue configs) throws Exception; - - /** - * Remove the connector - * - * @param connectorName - */ - void removeConnectorConfig(String connectorName); - - void recomputeTaskConfigs(String connectorName, Connector connector, Long currentTimestamp, TargetKeyValue configs); +public interface TargetRunnerConfigObserver { /** - * Get all Task configs. - * + * Get the target runner config of runtimer. * @return */ - Map<String, List<TargetKeyValue>> getTaskConfigs(); - - /** - * get target info - * @return - */ - Set<PusherTargetEntity> getTargetInfo(); - - /** - * Get all topics - * @return - */ - List<String> getConnectTopics(); - - /** - * Persist all the configs in a store. - */ - void persist(); + Set<TargetRunnerConfig> getTargetRunnerConfig(); /** * Register a listener to listen all config update operations. * * @param listener */ - void registerListener(TargetConfigUpdateListener listener); - - interface TargetConfigUpdateListener { - - /** - * Invoke while connector config changed. - */ - void onConfigUpdate(PusherTargetEntity targetEntity); - } + void registerListener(TargetRunnerListener listener); + @Deprecated + Map<String, List<TargetKeyValue>> getTaskConfigs(); } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java new file mode 100644 index 0000000..f51ac50 --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnControllerObserver.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.runtimer.service; + +import com.google.common.collect.Sets; +import java.util.List; +import java.util.Map; +import java.util.Set; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; + +@Slf4j +public class TargetRunnerConfigOnControllerObserver extends AbstractTargetRunnerConfigObserver { + + @Override + public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() { + return Sets.newHashSet(); + } + + public void add(TargetRunnerConfig targetRunnerConfig) { + super.onAddTargetRunner(targetRunnerConfig); + } + + public void update(TargetRunnerConfig targetRunnerConfig) { + super.onUpdateTargetRunner(targetRunnerConfig); + } + + public void delete(TargetRunnerConfig targetRunnerConfig) { + super.onDeleteTargetRunner(targetRunnerConfig); + } + +} \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java new file mode 100644 index 0000000..3d847a1 --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.runtimer.service; + +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.utils.ThreadUtils; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin; + +@Slf4j +public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigObserver { + + private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor( + ThreadUtils.newThreadFactory("TargetRunnerConfigOnDBObserver", false)); + + public TargetRunnerConfigOnDBObserver(Plugin plugin) { + this.addListen(this); + } + + @Override + public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() { + return null; + } + + public void addListen( + TargetRunnerConfigOnDBObserver pusherConfigOnFileService) { + service.scheduleAtFixedRate(() -> { + try { + Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig(); + Set<TargetRunnerConfig> last = super.getTargetRunnerConfig(); + TargetRunnerConfig changed = null; + super.onAddTargetRunner(changed); + super.onUpdateTargetRunner(changed); + super.onDeleteTargetRunner(changed); + } catch (Throwable e) { + log.error("Watch failed.", e); + } + }, 0, 3, TimeUnit.SECONDS); + } + +} \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java new file mode 100644 index 0000000..27269b8 --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.runtimer.service; + +import java.nio.file.FileSystems; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.utils.ThreadUtils; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.plugin.Plugin; +import org.springframework.beans.factory.annotation.Value; + +@Slf4j +public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfigObserver { + + @Value("${runtimer.storePathRootDir:}") + private String storeRootPath; + + @Value("${runtimer.store.targetRunner.config:targetRunner-config}") + private String fileName; + + private static ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor( + ThreadUtils.newThreadFactory("TargetRunnerConfigOnFileObserver", false)); + + public TargetRunnerConfigOnFileObserver(Plugin plugin) { + this.addListen(storeRootPath, fileName, this); + } + + @Override + public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() { + return null; + } + + public void addListen(String pathStr, String fileName, + TargetRunnerConfigOnFileObserver pusherConfigOnFileService) { + log.info("Watching task file changing:{}", pathStr + fileName); + service.scheduleAtFixedRate(() -> { + try { + WatchService watchService = FileSystems.getDefault() + .newWatchService(); + Path path = Paths.get(pathStr); + path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, + StandardWatchEventKinds.ENTRY_MODIFY); + WatchKey watchKey; + while (true) { + watchKey = watchService.take(); + if (watchKey != null && !watchKey.pollEvents() + .isEmpty()) { + log.info("Watched the file changed events."); + pusherConfigOnFileService.diff(); + } + watchKey.reset(); + } + } catch (Throwable e) { + log.error("Watch file failed.", e); + } + }, 0, 3, TimeUnit.SECONDS); + } + + public void diff() { + Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig(); + Set<TargetRunnerConfig> last = super.getTargetRunnerConfig(); + TargetRunnerConfig changed = null; + super.onAddTargetRunner(changed); + super.onUpdateTargetRunner(changed); + super.onDeleteTargetRunner(changed); + } + +} \ No newline at end of file
