This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch runtimer in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit 667111e6b998a7c8007aeee3d48b1b0245cfaabe Author: 2011shenlin <[email protected]> AuthorDate: Sun Apr 23 16:36:29 2023 +0800 feat:support record commit. --- .../eventbridge/adapter/runtimer/Runtimer.java | 36 +++++----- .../adapter/runtimer/boot/EventRuleTransfer.java | 76 +++++++++++++--------- .../adapter/runtimer/boot/EventTargetPusher.java | 20 +++--- .../OffsetManager.java} | 25 +++++-- .../runtimer/boot/listener/EventSubscriber.java | 10 +++ .../boot/listener/RocketMQEventSubscriber.java | 4 ++ .../adapter/runtimer/common/ServiceThread.java | 1 + .../runtimer/config/RuntimerConfiguration.java | 41 ++++++++++++ .../{retry => error}/DeadLetterQueueService.java | 2 +- .../runtimer/{retry => error}/ErrorHandler.java | 9 ++- .../adapter/runtimer/retry/EventBusStorage.java | 25 ------- .../service/TargetRunnerConfigOnFileObserver.java | 2 + 12 files changed, 155 insertions(+), 96 deletions(-) diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java index 7eb922d..495aeee 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/Runtimer.java @@ -22,17 +22,14 @@ import javax.annotation.PostConstruct; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventBusListener; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventRuleTransfer; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.EventTargetPusher; +import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.OffsetManager; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber; -import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.RocketMQEventSubscriber; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.RuntimerState; -import org.apache.rocketmq.eventbridge.adapter.runtimer.common.enums.ConfigModeEnum; import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver; -import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigOnDBObserver; -import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigOnFileObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** @@ -49,30 +46,33 @@ public class Runtimer { private CirculatorContext circulatorContext; + @Autowired private TargetRunnerConfigObserver runnerConfigObserver; + @Autowired + private OffsetManager offsetManager; + @Autowired + private EventSubscriber eventSubscriber; - public Runtimer(CirculatorContext circulatorContext, @Value("${rumtimer.config.mode}") String configMode) { + public Runtimer( + CirculatorContext circulatorContext, + TargetRunnerConfigObserver runnerConfigObserver, + OffsetManager offsetManager, + EventSubscriber eventSubscriber) { this.circulatorContext = circulatorContext; - switch (ConfigModeEnum.parse(configMode)) { - case DB: - this.runnerConfigObserver = new TargetRunnerConfigOnDBObserver(); - break; - default: - this.runnerConfigObserver = new TargetRunnerConfigOnFileObserver(); - break; - } + this.runnerConfigObserver = runnerConfigObserver; + this.offsetManager = offsetManager; + this.eventSubscriber = eventSubscriber; } @PostConstruct public void initAndStart() { - logger.info("init runtimer task config"); + logger.info("Start init runtimer."); circulatorContext.initListenerMetadata(runnerConfigObserver.getTargetRunnerConfig()); - EventSubscriber eventSubscriber = new RocketMQEventSubscriber(runnerConfigObserver); runnerConfigObserver.registerListener(circulatorContext); runnerConfigObserver.registerListener(eventSubscriber); new EventBusListener(circulatorContext, eventSubscriber).start(); - new EventRuleTransfer(circulatorContext).start(); - new EventTargetPusher(circulatorContext).start(); + new EventRuleTransfer(circulatorContext, offsetManager).start(); + new EventTargetPusher(circulatorContext, offsetManager).start(); startRuntimer(); } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java index 635c4bd..06f1ce3 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventRuleTransfer.java @@ -20,6 +20,13 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.boot; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; import io.openmessaging.connector.api.data.ConnectRecord; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import javax.annotation.PostConstruct; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext; @@ -28,13 +35,8 @@ import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread; import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.*; -import java.util.stream.Collectors; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; /** * receive event and transfer the rule to pusher @@ -43,10 +45,14 @@ public class EventRuleTransfer extends ServiceThread { private static final Logger logger = LoggerFactory.getLogger(EventRuleTransfer.class); + @Autowired private final CirculatorContext circulatorContext; + @Autowired + private final OffsetManager offsetManager; - public EventRuleTransfer(CirculatorContext circulatorContext) { + public EventRuleTransfer(CirculatorContext circulatorContext, OffsetManager offsetManager) { this.circulatorContext = circulatorContext; + this.offsetManager = offsetManager; } @Override @@ -54,6 +60,10 @@ public class EventRuleTransfer extends ServiceThread { return this.getClass().getSimpleName(); } + @PostConstruct + public void init(){ + super.start(); + } @Override public void run() { while (!stopped) { @@ -64,7 +74,7 @@ public class EventRuleTransfer extends ServiceThread { continue; } Map<String, TransformEngine<ConnectRecord>> latestTransformMap = circulatorContext.getTaskTransformMap(); - if(MapUtils.isEmpty(latestTransformMap)){ + if (MapUtils.isEmpty(latestTransformMap)) { logger.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis()); this.waitForRunning(3000); continue; @@ -73,38 +83,40 @@ public class EventRuleTransfer extends ServiceThread { String eventChannelName = RuntimerConfigDefine.CHANNEL_NAME; String eventChannel = eventRecord.getExtension(eventChannelName); Set<TransformEngine<ConnectRecord>> adaptTransformSet = latestTransformMap.values().stream() - .filter(engine -> eventChannel.equals(engine.getConnectConfig(eventChannelName))) - .collect(Collectors.toSet()); - if(CollectionUtils.isEmpty(adaptTransformSet)){ - logger.warn("adapt specific topic ref transform engine is empty, eventChannelName- {}", eventChannel); - this.waitForRunning(3000); - continue; + .filter(engine -> eventChannel.equals(engine.getConnectConfig(eventChannelName))) + .collect(Collectors.toSet()); + if (CollectionUtils.isEmpty(adaptTransformSet)) { + logger.warn("adapt specific topic ref transform engine is empty, eventChannelName- {}", eventChannel); + this.waitForRunning(3000); + continue; } List<ConnectRecord> afterTransformConnect = Lists.newArrayList(); List<CompletableFuture<Void>> completableFutures = Lists.newArrayList(); - adaptTransformSet.forEach(transfer->{ - CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(()-> transfer.doTransforms(eventRecord)) - .exceptionally((exception) -> { - logger.error("transfer do transform event record failed,stackTrace-", exception); - return null; - }) - .thenAccept(record-> { - if(Objects.nonNull(record)){ - String runnerNameKey = RuntimerConfigDefine.RUNNER_NAME; - String taskClassKey = RuntimerConfigDefine.TASK_CLASS; - record.getExtensions().put(runnerNameKey, transfer.getConnectConfig(runnerNameKey)); - record.getExtensions().put(taskClassKey, transfer.getConnectConfig(taskClassKey)); - afterTransformConnect.add(record); - } - }); + adaptTransformSet.forEach(transfer -> { + CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(() -> transfer.doTransforms(eventRecord)) + .exceptionally((exception) -> { + logger.error("transfer do transform event record failed,stackTrace-", exception); + return null; + }) + .thenAccept(record -> { + if (Objects.nonNull(record)) { + String runnerNameKey = RuntimerConfigDefine.RUNNER_NAME; + String taskClassKey = RuntimerConfigDefine.TASK_CLASS; + record.getExtensions().put(runnerNameKey, transfer.getConnectConfig(runnerNameKey)); + record.getExtensions().put(taskClassKey, transfer.getConnectConfig(taskClassKey)); + afterTransformConnect.add(record); + }else{ + offsetManager.commit(eventRecord); + } + }); completableFutures.add(transformFuture); }); - try{ + try { CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[adaptTransformSet.size()])).get(); circulatorContext.offerTargetTaskQueue(afterTransformConnect); logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect)); - }catch (Exception exception){ + } catch (Exception exception) { logger.error("transfer event record failed, stackTrace-", exception); } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java index 43e6b29..e420ee2 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java @@ -21,29 +21,29 @@ import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; import io.openmessaging.connector.api.component.task.sink.SinkTask; import io.openmessaging.connector.api.data.ConnectRecord; +import java.util.Objects; +import java.util.concurrent.ExecutorService; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread; import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ExecutorService; - /** * event target push to sink task * * @author artisan */ -public class EventTargetPusher extends ServiceThread{ +public class EventTargetPusher extends ServiceThread { private static final Logger logger = LoggerFactory.getLogger(EventTargetPusher.class); private final CirculatorContext circulatorContext; + private final OffsetManager offsetManager; - public EventTargetPusher(CirculatorContext circulatorContext) { + public EventTargetPusher(CirculatorContext circulatorContext, OffsetManager offsetManager) { this.circulatorContext = circulatorContext; + this.offsetManager = offsetManager; } @Override @@ -55,7 +55,7 @@ public class EventTargetPusher extends ServiceThread{ this.waitForRunning(1000); continue; } - if(logger.isDebugEnabled()){ + if (logger.isDebugEnabled()) { logger.debug("start push content by pusher - {}", JSON.toJSONString(targetRecord)); } @@ -63,9 +63,10 @@ public class EventTargetPusher extends ServiceThread{ executorService.execute(() -> { try { String runnerName = targetRecord.getExtensions().getString(RuntimerConfigDefine.RUNNER_NAME); - SinkTask sinkTask = circulatorContext.getPusherTaskMap().get(runnerName);; + SinkTask sinkTask = circulatorContext.getPusherTaskMap().get(runnerName); sinkTask.put(Lists.newArrayList(targetRecord)); - }catch (Exception exception){ + offsetManager.commit(targetRecord); + } catch (Exception exception) { logger.error(getServiceName() + " push target exception, record - " + targetRecord + " , stackTrace-", exception); } }); @@ -77,5 +78,4 @@ public class EventTargetPusher extends ServiceThread{ return EventTargetPusher.class.getSimpleName(); } - } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorageOnRocketMQ.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/OffsetManager.java similarity index 54% rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorageOnRocketMQ.java rename to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/OffsetManager.java index 94d8d4a..0613efb 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorageOnRocketMQ.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/OffsetManager.java @@ -15,20 +15,31 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.adapter.runtimer.retry; +package org.apache.rocketmq.eventbridge.adapter.runtimer.boot; +import com.google.common.collect.Lists; import io.openmessaging.connector.api.data.ConnectRecord; import java.util.List; +import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; -public class EventBusStorageOnRocketMQ implements EventBusStorage { +@Component +public class OffsetManager { - @Override - public void put(String eventBusName, ConnectRecord connectRecord, int delaySec) { + @Autowired + EventSubscriber eventSubscriber; + public OffsetManager(EventSubscriber eventSubscriber) { + this.eventSubscriber = eventSubscriber; } - public List<String> parseEventBusName(String eventBusName) { - //TODO - return null; + public void commit(final List<ConnectRecord> connectRecordList) { + this.eventSubscriber.commit(connectRecordList); } + + public void commit(final ConnectRecord connectRecord) { + this.eventSubscriber.commit(Lists.newArrayList(connectRecord)); + } + } \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java index f24ce08..39d08d7 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/EventSubscriber.java @@ -46,6 +46,16 @@ public abstract class EventSubscriber implements TargetRunnerListener { */ public abstract void commit(List<ConnectRecord> connectRecordList); + /** + * Put the connect record to the eventbus. + * @param eventBusName + * @param connectRecord + * @param delaySec + */ + public void put(String eventBusName, ConnectRecord connectRecord, int delaySec){ + // convert the eventBusName to Topic ? + } + /** * Call when add new target runner to runtimer. * diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java index aa263c7..b4c7527 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java @@ -49,6 +49,7 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.proxy.SocksProxyConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.io.support.PropertiesLoaderUtils; import java.nio.charset.StandardCharsets; @@ -60,16 +61,19 @@ import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; +import org.springframework.stereotype.Component; /** * RocketMQ implement event subscriber */ +@Component public class RocketMQEventSubscriber extends EventSubscriber { private static final Logger logger = LoggerFactory.getLogger(RocketMQEventSubscriber.class); private LitePullConsumer pullConsumer; + @Autowired private final TargetRunnerConfigObserver runnerConfigObserver; private Integer pullTimeOut; diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/ServiceThread.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/ServiceThread.java index 33f4d9d..566d8e2 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/ServiceThread.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/ServiceThread.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.eventbridge.adapter.runtimer.common; +import javax.annotation.PostConstruct; import org.apache.rocketmq.common.CountDownLatch2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfiguration.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfiguration.java new file mode 100644 index 0000000..7160cca --- /dev/null +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/config/RuntimerConfiguration.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.runtimer.config; + +import org.apache.rocketmq.eventbridge.adapter.runtimer.common.enums.ConfigModeEnum; +import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigObserver; +import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigOnDBObserver; +import org.apache.rocketmq.eventbridge.adapter.runtimer.service.TargetRunnerConfigOnFileObserver; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RuntimerConfiguration { + + @Bean(name = "runnerConfigObserver") + public TargetRunnerConfigObserver targetRunnerConfigObserver(@Value("${rumtimer.config.mode}") String configMode) { + switch (ConfigModeEnum.parse(configMode)) { + case DB: + return new TargetRunnerConfigOnDBObserver(); + default: + return new TargetRunnerConfigOnFileObserver(); + } + } + +} \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/DeadLetterQueueService.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/DeadLetterQueueService.java similarity index 92% rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/DeadLetterQueueService.java rename to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/DeadLetterQueueService.java index ce6a642..fd108c9 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/DeadLetterQueueService.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/DeadLetterQueueService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.adapter.runtimer.retry; +package org.apache.rocketmq.eventbridge.adapter.runtimer.error; public class DeadLetterQueueService { diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/ErrorHandler.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java similarity index 91% rename from adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/ErrorHandler.java rename to adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java index 412b538..16bdbb9 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/ErrorHandler.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java @@ -15,24 +15,27 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.adapter.runtimer.retry; +package org.apache.rocketmq.eventbridge.adapter.runtimer.error; import com.google.common.base.Strings; import io.openmessaging.connector.api.data.ConnectRecord; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.EventSubscriber; import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerContext; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import static org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.CONNECT_RECORDS_KEY; import static org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.RUNNER_NAME; @Slf4j +@Component public class ErrorHandler { @Autowired - EventBusStorage eventBusStorage; + EventSubscriber eventSubscriber; public void handle(ConnectRecord connectRecord, Throwable t) { String eventRunnerName = connectRecord.getExtension(RUNNER_NAME); @@ -42,7 +45,7 @@ public class ErrorHandler { int retryTimes = parseRetryTimes(connectRecord); int delaySec = calcDelaySec(retryTimes, pushRetryStrategyEnum); if (delaySec > 0) { - eventBusStorage.put(eventBusName, connectRecord, delaySec); + eventSubscriber.put(eventBusName, connectRecord, delaySec); } } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorage.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorage.java deleted file mode 100644 index cbec332..0000000 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/retry/EventBusStorage.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.eventbridge.adapter.runtimer.retry; - -import io.openmessaging.connector.api.data.ConnectRecord; - -public interface EventBusStorage { - - void put(String eventBusName, ConnectRecord connectRecord, int delaySec); -} \ No newline at end of file diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java index 63f1e9d..791efc1 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java @@ -41,9 +41,11 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig; import org.apache.rocketmq.eventbridge.exception.EventBridgeException; +import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; @Slf4j +@Component public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfigObserver { private String pathName;
