This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch runtimer in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit c6d5b8dbbe7f93e9d86eb9727fe70cdef7c3b452 Author: 2011shenlin <[email protected]> AuthorDate: Sun Jun 11 23:19:10 2023 +0800 feat:add runtime demo. --- README.md | 10 +- .../adapter/api/controller/EventBusController.java | 2 +- .../adapter/api/dto/bus/ListEventBusesRequest.java | 2 +- .../adapter/persistence/FlywayConfig.java | 45 ++++ .../MybatisEventTargetRunnerRepository.java | 3 - .../adapter/runtime/boot/EventRuleTransfer.java | 53 ++-- .../adapter/runtime/boot/EventTargetTrigger.java | 2 +- .../service/TargetRunnerConfigOnDBObserver.java | 15 +- .../rocketmq/impl/RocketMQEventDataRepository.java | 10 +- .../rocketmq/runtimer/RocketMQEventSubscriber.java | 6 +- docs/CreateDingTalkTarget.md | 86 ------- docs/CreateFileTarget.md | 51 ++++ docs/cn/images/demo.png | Bin 0 -> 49258 bytes .../domain/storage/EventDataRepository.java | 7 + .../validate/DefaultAuthValidation.java | 2 +- pom.xml | 11 + start/pom.xml | 4 + .../java/org/apache/rocketmq/eventbridge/Main.java | 3 +- start/src/main/resources/application.properties | 5 +- {start => test/demo}/pom.xml | 129 +++++----- .../rocketmq/eventbridge/demo/DefaultDemo.java | 105 ++++++++ .../ApiDestinationDTOControllerTest.java | 187 ++++++++++++++ .../api/controller/ConnectionControllerTest.java | 285 +++++++++++++++++++++ .../api/converter/EventConverterAdapterTest.java | 120 +++++++++ .../api/converter/EventTargetConverterTest.java | 107 ++++++++ .../api/converter/EventTargetDTOConverterTest.java | 96 +++++++ .../adapter/api/handler/EventDataHandlerTest.java | 112 ++++++++ test/pom.xml | 28 ++ 28 files changed, 1278 insertions(+), 208 deletions(-) diff --git a/README.md b/README.md index 505be7d..6dac851 100644 --- a/README.md +++ b/README.md @@ -88,13 +88,13 @@ curl -X POST http://127.0.0.1:7001/putEvents \ -H "ce-datacontenttype:application/json" \ -H "ce-time:2018-04-05T17:31:00Z" \ -H "ce-aliyuneventbusname:demo-bus" \ --d 'test' +-d 'A test recrod.' ``` * Check if the local file received a write event -In addition, by default, the system will create a demo rule for you to subscribe and push to the file. You can check whether there are events received in the directory:~/demo.eventbridge +In addition, by default, the system will create a demo rule for you to subscribe and push to the file. You can check whether there are events received in the directory:~/demo + - - ->> 链接:为什么输出:test,... +Why does the file output the data attribute of CloudEvent instead of other attributes?This is because the configuration in the demo rule is to output "$.data" in CloudEvent to the file line. +You can refer to this [document](docs/CreateFileTarget.md) to configure and modify event targets. diff --git a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java index 8f8388c..79c4a21 100644 --- a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java +++ b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java @@ -95,7 +95,7 @@ public class EventBusController { paginationResult.getData() .forEach(eventBus -> { EventBusDTO eventBusDTO = new EventBusDTO(); - eventBusDTO.setEventBusName(eventBusDTO.getEventBusName()); + eventBusDTO.setEventBusName(eventBus.getName()); eventBusDTO.setDescription(eventBus.getDescription()); eventBuses.add(eventBusDTO); }); diff --git a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/bus/ListEventBusesRequest.java b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/bus/ListEventBusesRequest.java index 48b0932..d1590fe 100644 --- a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/bus/ListEventBusesRequest.java +++ b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/bus/ListEventBusesRequest.java @@ -25,7 +25,7 @@ import org.apache.rocketmq.eventbridge.adapter.api.dto.BaseRequest; public class ListEventBusesRequest extends BaseRequest { @SerializedName("NextToken") - private String nextToken; + private String nextToken = "0"; @SerializedName("MaxResults") private int maxResults; diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/FlywayConfig.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/FlywayConfig.java new file mode 100644 index 0000000..5ed64d3 --- /dev/null +++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/FlywayConfig.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.persistence; + +import javax.sql.DataSource; +import org.flywaydb.core.Flyway; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class FlywayConfig { + + @Autowired + private DataSource dataSource; + + @Bean("flyway") + public Flyway Flyway() { + Flyway flyway = Flyway.configure() + .dataSource(dataSource) + .cleanDisabled(Boolean.TRUE) + .createSchemas(Boolean.TRUE) + .validateMigrationNaming(Boolean.TRUE) + .baselineOnMigrate(Boolean.TRUE) + .placeholderReplacement(Boolean.FALSE) + .load(); + flyway.migrate(); + return flyway; + } +} \ No newline at end of file diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java index 0c92aa4..97bfb88 100644 --- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java +++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/target/mybatis/repository/MybatisEventTargetRunnerRepository.java @@ -74,9 +74,6 @@ public class MybatisEventTargetRunnerRepository implements EventTargetRunnerRepo @Override public List<EventTargetRunner> listEventTargetRunners(String accountId, String eventBusName, String eventRuleName) { - if (StringUtils.isBlank(accountId) || StringUtils.isBlank(eventBusName) || StringUtils.isBlank(eventRuleName)) { - return Lists.newArrayListWithCapacity(0); - } List<EventTargetRunnerDO> eventTargetRunnerDOS = eventTargetRunnerMapper.listEventTargetRunners(accountId, eventBusName, eventRuleName); if (eventTargetRunnerDOS == null || eventTargetRunnerDOS.isEmpty()) { return Lists.newArrayListWithCapacity(0); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java index d7e41fc..79dfd00 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventRuleTransfer.java @@ -25,10 +25,9 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; import javax.annotation.PostConstruct; - import org.apache.commons.collections.MapUtils; -import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.CirculatorContext; +import org.apache.rocketmq.eventbridge.adapter.runtime.boot.common.OffsetManager; import org.apache.rocketmq.eventbridge.adapter.runtime.boot.transfer.TransformEngine; import org.apache.rocketmq.eventbridge.adapter.runtime.common.ServiceThread; import org.apache.rocketmq.eventbridge.adapter.runtime.error.ErrorHandler; @@ -68,27 +67,29 @@ public class EventRuleTransfer extends ServiceThread { @Override public void run() { + List<ConnectRecord> afterTransformConnect= Lists.newArrayList(); while (!stopped) { - Map<String, List<ConnectRecord>> eventRecordMap = circulatorContext.takeEventRecords(batchSize); - if(MapUtils.isEmpty(eventRecordMap)){ - logger.info("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis()); - this.waitForRunning(1000); - continue; - } - Map<String, TransformEngine<ConnectRecord>> latestTransformMap = circulatorContext.getTaskTransformMap(); - if (MapUtils.isEmpty(latestTransformMap)) { - logger.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis()); - this.waitForRunning(3000); - continue; - } - - List<ConnectRecord> afterTransformConnect = Lists.newArrayList(); - List<CompletableFuture<Void>> completableFutures = Lists.newArrayList(); - for(String runnerName: eventRecordMap.keySet()){ - TransformEngine<ConnectRecord> curTransformEngine = latestTransformMap.get(runnerName); - List<ConnectRecord> curEventRecords = eventRecordMap.get(runnerName); - curEventRecords.forEach(pullRecord -> { - CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord)) + try { + Map<String, List<ConnectRecord>> eventRecordMap = circulatorContext.takeEventRecords(batchSize); + if (MapUtils.isEmpty(eventRecordMap)) { + logger.trace("listen eventRecords is empty, continue by curTime - {}", System.currentTimeMillis()); + this.waitForRunning(1000); + continue; + } + Map<String, TransformEngine<ConnectRecord>> latestTransformMap = circulatorContext.getTaskTransformMap(); + if (MapUtils.isEmpty(latestTransformMap)) { + logger.warn("latest transform engine is empty, continue by curTime - {}", System.currentTimeMillis()); + this.waitForRunning(3000); + continue; + } + + afterTransformConnect.clear(); + List<CompletableFuture<Void>> completableFutures = Lists.newArrayList(); + for (String runnerName : eventRecordMap.keySet()) { + TransformEngine<ConnectRecord> curTransformEngine = latestTransformMap.get(runnerName); + List<ConnectRecord> curEventRecords = eventRecordMap.get(runnerName); + curEventRecords.forEach(pullRecord -> { + CompletableFuture<Void> transformFuture = CompletableFuture.supplyAsync(() -> curTransformEngine.doTransforms(pullRecord)) .exceptionally((exception) -> { logger.error("transfer do transform event record failed,stackTrace-", exception); errorHandler.handle(pullRecord, exception); @@ -101,11 +102,9 @@ public class EventRuleTransfer extends ServiceThread { offsetManager.commit(pullRecord); } }); - completableFutures.add(transformFuture); - }); - } - - try { + completableFutures.add(transformFuture); + }); + } CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[eventRecordMap.values().size()])).get(); circulatorContext.offerTargetTaskQueue(afterTransformConnect); logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(afterTransformConnect)); diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java index 3f61b10..85dae3b 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/EventTargetTrigger.java @@ -60,7 +60,7 @@ public class EventTargetTrigger extends ServiceThread { while (!stopped) { Map<String, List<ConnectRecord>> targetRecordMap = circulatorContext.takeTargetRecords(batchSize); if (MapUtils.isEmpty(targetRecordMap)) { - logger.info("current target pusher is empty"); + logger.trace("current target pusher is empty"); this.waitForRunning(1000); continue; } diff --git a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java index f018ce1..0e34f6a 100644 --- a/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java +++ b/adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/service/TargetRunnerConfigOnDBObserver.java @@ -48,7 +48,8 @@ public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigOb @Autowired EventTargetRepository eventTargetRepository; - public TargetRunnerConfigOnDBObserver(EventTargetRunnerRepository eventTargetRunnerRepository, EventTargetRepository eventTargetRepository) { + public TargetRunnerConfigOnDBObserver(EventTargetRunnerRepository eventTargetRunnerRepository, + EventTargetRepository eventTargetRepository) { this.eventTargetRunnerRepository = eventTargetRunnerRepository; this.eventTargetRepository = eventTargetRepository; } @@ -56,7 +57,17 @@ public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigOb @Override @Transactional public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() { - List<EventTargetRunner> eventTargetRunners = eventTargetRunnerRepository.listEventTargetRunners(null, null, null); + List<EventTargetRunner> eventTargetRunners = null; + try { + eventTargetRunners = eventTargetRunnerRepository.listEventTargetRunners(null, null, null); + } catch (Throwable e) { + if (e.getMessage().contains("not found")) { + return Sets.newHashSet(); + } + } + if (eventTargetRunners == null || eventTargetRunners.isEmpty()) { + return Sets.newHashSet(); + } Set<TargetRunnerConfig> targetRunnerConfigs = Sets.newHashSet(); for (EventTargetRunner eventTargetRunner : eventTargetRunners) { targetRunnerConfigs.add(new Gson().fromJson(eventTargetRunner.getRunContext(), TargetRunnerConfig.class)); diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java index 320c69e..1b95789 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/impl/RocketMQEventDataRepository.java @@ -20,7 +20,6 @@ package org.apache.rocketmq.eventbridge.adapter.storage.rocketmq.impl; import com.google.gson.Gson; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.eventbridge.adapter.persistence.data.mybatis.dataobject.EventTopicDO; @@ -93,10 +92,11 @@ public class RocketMQEventDataRepository implements EventDataRepository { @Cacheable(value = "topicCache") @Override public String getTopicName(String accountId, String eventBusName) { - String topicName = eventDataOnRocketMQConnectAPI.buildTopicName(accountId, eventBusName); - if (StringUtils.isBlank(AppConfig.getGlobalConfig().getDefaultDataPersistentClusterName())) { - return topicName; - } + return getTopicNameWithOutCache(accountId, eventBusName); + } + + @Override public String getTopicNameWithOutCache(String accountId, String eventBusName) { + String topicName = null; EventTopicDO eventTopicDO = eventTopicMapper.getTopic(accountId, eventBusName); if (eventTopicDO != null) { topicName = eventTopicDO.getName(); diff --git a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java index 56a4550..df9cda7 100644 --- a/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java +++ b/adapter/storage/src/main/java/org/apache/rocketmq/eventbridge/adapter/storage/rocketmq/runtimer/RocketMQEventSubscriber.java @@ -52,6 +52,7 @@ import org.apache.rocketmq.remoting.proxy.SocksProxyConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.DependsOn; import org.springframework.core.io.support.PropertiesLoaderUtils; import org.springframework.stereotype.Component; @@ -69,6 +70,7 @@ import java.util.stream.Collectors; * RocketMQ implement event subscriber */ @Component +@DependsOn("flyway") public class RocketMQEventSubscriber extends EventSubscriber { private static final Logger logger = LoggerFactory.getLogger(RocketMQEventSubscriber.class); @@ -122,7 +124,7 @@ public class RocketMQEventSubscriber extends EventSubscriber { ArrayList<MessageExt> messages = new ArrayList<>(); messageBuffer.drainTo(messages, pullBatchSize); if (CollectionUtils.isEmpty(messages)) { - logger.info("consumer poll message empty."); + logger.trace("consumer poll message empty."); return null; } List<ConnectRecord> connectRecords = Lists.newArrayList(); @@ -251,7 +253,7 @@ public class RocketMQEventSubscriber extends EventSubscriber { } private String getTopicName(SubscribeRunnerKeys subscribeRunnerKeys) { - return eventDataRepository.getTopicName(subscribeRunnerKeys.getAccountId(), subscribeRunnerKeys.getEventBusName()); + return eventDataRepository.getTopicNameWithOutCache(subscribeRunnerKeys.getAccountId(), subscribeRunnerKeys.getEventBusName()); } private String createGroupName(String prefix) { diff --git a/docs/CreateDingTalkTarget.md b/docs/CreateDingTalkTarget.md deleted file mode 100644 index 4446433..0000000 --- a/docs/CreateDingTalkTarget.md +++ /dev/null @@ -1,86 +0,0 @@ - -## Create EventBus - -```text -POST /bus/createEventBus HTTP/1.1 -Host: demo.eventbridge.com -Content-Type: application/json; charset=utf-8 -{ -"eventBusName":"demo-bus", -"description":"a demo bus." -} -``` - -## Create EventSource - -```text -POST /source/createEventSource HTTP/1.1 -Host: demo.eventbridge.com -Content-Type: application/json; charset=utf-8 -{ -"eventBusName":"demo-bus", -"eventSourceName":"demo-source", -"description":"A demo source." -} -``` - -## Create EventRule - -```text -POST /rule/createEventRule HTTP/1.1 -Host: demo.eventbridge.com -Content-Type: application/json; charset=utf-8 -{ - "eventBusName":"demo-bus", - "eventRuleName":"demo-rule", - "description":"A demo rule.", - "filterPattern":"{}" -} -``` - -## Create Target - -This is a sample with EventBridge target: - -```text -POST /target/createEventTargets HTTP/1.1 -Host: demo.eventbridge.com -Content-Type: application/json; charset=utf-8 -{ - "eventBusName":"demo-bus", - "eventRuleName":"demo-rule", - "eventTargets":[ - { - "eventTargetName":"eventbridge-target", - "className":"acs.eventbridge", - "config":{ - "RegionId":"cn-hangzhou", - "AliyunEventBus":"rocketmq-eventbridge" - } - } - ] -} -``` - -This is a sample with DingTalk target: - -```text -POST /target/createEventTargets HTTP/1.1 -Host: demo.eventbridge.com -Content-Type: application/json; charset=utf-8 -{ - "eventBusName":"demo-bus", - "eventRuleName":"demo-rule", - "eventTargets":[ - { - "eventTargetName":"dingtalk-target", - "className":"acs.dingtalk", - "config":{ - "WebHook":"https://oapi.dingtalk.com/robot/send?access_token=b43a54b702314415c2acdae97eda1e092528b7a9dddb31510a5b4430be2ef867", - "SecretKey":"SEC53483bf496b8f9e0b4ab0ab669d422208e6ccfaedfd5120ea6b8426b9ecd47aa", - "Body":"{\"template\":\"{\\\"text\\\":{\\\"content\\\":\\\"${content}\\\"},\\\"msgtype\\\":\\\"text\\\"}\",\"form\":\"TEMPLATE\",\"value\":\"{\\\"content\\\":\\\"$.data.body\\\"}\"}" - } - } - ] -} -``` diff --git a/docs/CreateFileTarget.md b/docs/CreateFileTarget.md new file mode 100644 index 0000000..53dff84 --- /dev/null +++ b/docs/CreateFileTarget.md @@ -0,0 +1,51 @@ + + +## Create EventBus + +```text +POST /bus/createEventBus HTTP/1.1 +Host: demo.eventbridge.com +Content-Type: application/json; charset=utf-8 +{ +"eventBusName":"demo-bus", +"description":"a demo bus." +} +``` + +## Create EventRule + +```text +POST /rule/createEventRule HTTP/1.1 +Host: demo.eventbridge.com +Content-Type: application/json; charset=utf-8 +{ + "eventBusName":"demo-bus", + "eventRuleName":"demo-rule", + "description":"A demo rule.", + "filterPattern":"{}" +} +``` + +## Create Target + +This is a sample with EventBridge target: + +```text +POST /target/createEventTargets HTTP/1.1 +Host: demo.eventbridge.com +Content-Type: application/json; charset=utf-8 +{ + "eventBusName":"demo-bus", + "eventRuleName":"demo-rule", + "eventTargets":[ + { + "eventTargetName":"demo-target", + "className":"acs.eventbridge", + "config":{ + "fileName":"~/demo", + "line":"{ \"form\":\"JSONPATH\", \"value\":\"$.data\"}" + } + } + ] +} +``` diff --git a/docs/cn/images/demo.png b/docs/cn/images/demo.png new file mode 100644 index 0000000..4f9c1a3 Binary files /dev/null and b/docs/cn/images/demo.png differ diff --git a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/storage/EventDataRepository.java b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/storage/EventDataRepository.java index 55a412e..550275d 100644 --- a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/storage/EventDataRepository.java +++ b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/storage/EventDataRepository.java @@ -59,4 +59,11 @@ public interface EventDataRepository { */ String getTopicName(String accountId, String eventBusName); + /** + * @param accountId + * @param eventBusName + * @return + */ + String getTopicNameWithOutCache(String accountId, String eventBusName); + } diff --git a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java index f85517d..5738a75 100644 --- a/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java +++ b/infrastructure/src/main/java/org/apache/rocketmq/eventbridge/infrastructure/validate/DefaultAuthValidation.java @@ -31,7 +31,7 @@ public class DefaultAuthValidation implements AuthValidation { @Override public Context validate(ServerHttpRequest request, Context ctx) { - String resourceOwnerId = "defaultResourceOwnerId"; + String resourceOwnerId = "default"; List<String> resourceOwnerIds = request.getHeaders().get(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID); if (resourceOwnerIds != null && !resourceOwnerIds.isEmpty()) { //throw new EventBridgeException(DefaultErrorCode.LoginFailed); diff --git a/pom.xml b/pom.xml index a855458..2c4ff1c 100644 --- a/pom.xml +++ b/pom.xml @@ -95,6 +95,7 @@ <jacoco-maven-plugin.version>0.8.5</jacoco-maven-plugin.version> <maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version> <rocketmq.version>5.1.0</rocketmq.version> + <flyway.version>8.5.7</flyway.version> </properties> <modules> @@ -148,6 +149,11 @@ <artifactId>rocketmq-eventbridge-infrastructure</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-test-demo</artifactId> + <version>${project.version}</version> + </dependency> <!-- Framework --> <dependency> @@ -320,6 +326,11 @@ <version>${mockito.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.flywaydb</groupId> + <artifactId>flyway-core</artifactId> + <version>${flyway.version}</version> + </dependency> </dependencies> </dependencyManagement> diff --git a/start/pom.xml b/start/pom.xml index 2bf80b0..e1e1995 100644 --- a/start/pom.xml +++ b/start/pom.xml @@ -54,6 +54,10 @@ <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-eventbridge-infrastructure</artifactId> </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-test-demo</artifactId> + </dependency> <!-- Framework --> <dependency> <groupId>org.springframework.boot</groupId> diff --git a/start/src/main/java/org/apache/rocketmq/eventbridge/Main.java b/start/src/main/java/org/apache/rocketmq/eventbridge/Main.java index 05344e6..fe8c642 100644 --- a/start/src/main/java/org/apache/rocketmq/eventbridge/Main.java +++ b/start/src/main/java/org/apache/rocketmq/eventbridge/Main.java @@ -18,9 +18,10 @@ package org.apache.rocketmq.eventbridge; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration; import org.springframework.cache.annotation.EnableCaching; -@SpringBootApplication(scanBasePackages = "org.apache.rocketmq.eventbridge.*") +@SpringBootApplication(scanBasePackages = "org.apache.rocketmq.eventbridge.*",exclude = {FlywayAutoConfiguration.class}) @EnableCaching public class Main { public static void main(String[] args) { diff --git a/start/src/main/resources/application.properties b/start/src/main/resources/application.properties index dc13acc..5911b30 100644 --- a/start/src/main/resources/application.properties +++ b/start/src/main/resources/application.properties @@ -24,7 +24,6 @@ spring.profiles.active=local #spring.datasource.hikari.password=xxxxx mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl ## flyway -spring.flyway.placeholderReplacement=false ## rocketmq rocketmq.namesrvAddr=localhost:9876 @@ -33,10 +32,10 @@ rocketmq.cluster.name=DefaultCluster runtime.config.mode=DB runtime.storage.mode=ROCKETMQ rumtime.name=eventbridge-runtimer -runtime.pluginpath=/Users/Local/eventbridge/plugin +runtime.pluginpath=~/eventbridge/plugin ## log app.name=rocketmqeventbridge log.level=INFO -log.path=/Users/Local/logs \ No newline at end of file +log.path=~/logs \ No newline at end of file diff --git a/start/pom.xml b/test/demo/pom.xml similarity index 50% copy from start/pom.xml copy to test/demo/pom.xml index 2bf80b0..80b3bf8 100644 --- a/start/pom.xml +++ b/test/demo/pom.xml @@ -12,20 +12,23 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> - <artifactId>rocketmq-eventbridge</artifactId> + <artifactId>rocketmq-eventbridge-test</artifactId> <groupId>org.apache.rocketmq</groupId> <version>1.0.0</version> + <relativePath>../pom.xml</relativePath> </parent> <modelVersion>4.0.0</modelVersion> - - <artifactId>rocketmq-eventbridge-start</artifactId> + <artifactId>rocketmq-test-demo</artifactId> <properties> - <maven.compiler.source>8</maven.compiler.source> - <maven.compiler.target>8</maven.compiler.target> + <jakarta.version>2.1.6</jakarta.version> + <jersey.version>2.34</jersey.version> + <reactor.version>3.4.14</reactor.version> + <httpcore.version>4.4.9</httpcore.version> </properties> + <dependencies> - <!-- Project Version --> + <!-- Project modules --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-eventbridge-adapter-persistence</artifactId> @@ -38,22 +41,10 @@ <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-eventbridge-adapter-api</artifactId> </dependency> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-eventbridge-adapter-runtime</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-eventbridge-adapter-storage</artifactId> - </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-eventbridge-domain</artifactId> </dependency> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-eventbridge-infrastructure</artifactId> - </dependency> <!-- Framework --> <dependency> <groupId>org.springframework.boot</groupId> @@ -61,71 +52,69 @@ </dependency> <dependency> <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-test</artifactId> - <scope>test</scope> + <artifactId>spring-boot-autoconfigure</artifactId> + </dependency> + <dependency> + <groupId>jakarta.ws.rs</groupId> + <artifactId>jakarta.ws.rs-api</artifactId> + <version>${jakarta.version}</version> + </dependency> + <dependency> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-common</artifactId> + <version>${jersey.version}</version> </dependency> - <!--Tools --> + + <!-- tools --> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency> <dependency> - <groupId>io.springfox</groupId> - <artifactId>springfox-swagger2</artifactId> + <groupId>io.cloudevents</groupId> + <artifactId>cloudevents-json-jackson</artifactId> + <version>${cloudevents.version}</version> </dependency> <dependency> - <groupId>io.springfox</groupId> - <artifactId>springfox-swagger-ui</artifactId> + <artifactId>cloudevents-http-basic</artifactId> + <groupId>io.cloudevents</groupId> + <version>${cloudevents.version}</version> </dependency> <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-validation</artifactId> + <groupId>io.cloudevents</groupId> + <artifactId>cloudevents-http-restful-ws</artifactId> + <version>${cloudevents.version}</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + <version>${httpcore.version}</version> + </dependency> + <!-- Test --> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-test</artifactId> + <version>${reactor.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.hibernate.validator</groupId> + <artifactId>hibernate-validator</artifactId> </dependency> <dependency> - <groupId>com.alibaba</groupId> - <artifactId>fastjson</artifactId> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-aop</artifactId> </dependency> </dependencies> - <build> - <finalName>rocketmq-eventbridge</finalName> - <resources> - <resource> - <directory>src/main/resources</directory> - <filtering>true</filtering> - <includes> - <include>logback.xml</include> - <include>application.properties</include> - <include>**/*.xml</include> - </includes> - </resource> - <resource> - <directory>src/main/java</directory> - <includes> - <include>**/*.properties</include> - <include>**/*.xml</include> - <include>**/*.tld</include> - </includes> - <filtering>false</filtering> - </resource> - </resources> - <plugins> - <plugin> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-maven-plugin</artifactId> - <version>2.2.10.RELEASE</version> - <executions> - <execution> - <id>repackage</id> - <goals> - <goal>repackage</goal> - </goals> - <configuration> - <mainClass>org.apache.rocketmq.eventbridge.Main</mainClass> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> </project> \ No newline at end of file diff --git a/test/demo/src/main/java/org/apache/rocketmq/eventbridge/demo/DefaultDemo.java b/test/demo/src/main/java/org/apache/rocketmq/eventbridge/demo/DefaultDemo.java new file mode 100644 index 0000000..ce71c7a --- /dev/null +++ b/test/demo/src/main/java/org/apache/rocketmq/eventbridge/demo/DefaultDemo.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.demo; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.List; +import java.util.Map; +import javax.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode; +import org.apache.rocketmq.eventbridge.domain.model.bus.EventBusService; +import org.apache.rocketmq.eventbridge.domain.model.rule.EventRuleService; +import org.apache.rocketmq.eventbridge.domain.model.target.EventTarget; +import org.apache.rocketmq.eventbridge.domain.model.target.EventTargetService; +import org.apache.rocketmq.eventbridge.exception.EventBridgeException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.DependsOn; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +@DependsOn("flyway") +public class DefaultDemo { + + @Autowired + EventBusService eventBusService; + + @Autowired + EventRuleService eventRuleService; + + @Autowired + EventTargetService eventTargetService; + + private static final String DEFAULT_ACCOUNT_ID = "default"; + + private static final String DEFAULT_EVENT_TOPIC_NAME = "demo-bus"; + + private static final String DEFAULT_EVENT_RULE_NAME = "demo-rule"; + + private static final String DEFAULT_EVENT_TARGET_NAME = "demo-target"; + + private static final String DEFAULT_EVENT_TARGET_CLASS = "file"; + + @PostConstruct + public void initDemo() { + log.info("init demo"); + initEventBus(); + initEventRule(); + intEventTarget(); + + } + + private void initEventBus() { + try { + eventBusService.getEventBus(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME); + } catch (EventBridgeException e) { + if (EventBridgeErrorCode.EventBusNotExist.getCode().equals(e.getCode())) { + eventBusService.createEventBus(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME, "A demo bus."); + log.info("Create demo eventbus:{}", DEFAULT_EVENT_TOPIC_NAME); + } + } + + } + + private void initEventRule() { + try { + eventRuleService.getEventRule(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME, DEFAULT_EVENT_RULE_NAME); + } catch (EventBridgeException e) { + if (EventBridgeErrorCode.EventRuleNotExist.getCode().equals(e.getCode())) { + eventRuleService.createEventRule(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME, DEFAULT_EVENT_RULE_NAME, "A demo rule.", "{}"); + log.info("Create demo event rule:{}", DEFAULT_EVENT_RULE_NAME); + } + } + } + + private void intEventTarget() { + List<EventTarget> eventTargets = eventTargetService.listTargets(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME, DEFAULT_EVENT_RULE_NAME); + if (eventTargets == null || eventTargets.isEmpty()) { + List<EventTarget> eventTargetList = Lists.newArrayList(); + Map<String, Object> config = Maps.newHashMap(); + config.put("fileName", System.getProperty("user.home") + "/demo"); + config.put("line", "{\"form\":\"JSONPATH\",\"value\":\"$.data\"}"); + EventTarget eventTarget = EventTarget.builder().name(DEFAULT_EVENT_TARGET_NAME).className(DEFAULT_EVENT_TARGET_CLASS).config(config).build(); + eventTargetList.add(eventTarget); + eventTargetService.createTargets(DEFAULT_ACCOUNT_ID, DEFAULT_EVENT_TOPIC_NAME, DEFAULT_EVENT_RULE_NAME, eventTargetList); + } + } + +} \ No newline at end of file diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java new file mode 100644 index 0000000..bdfc870 --- /dev/null +++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.api.controller; + +import com.google.common.collect.Lists; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import javax.validation.ConstraintViolation; +import javax.validation.Validator; +import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.CreateApiDestinationRequest; +import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.CreateApiDestinationResponse; +import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.DeleteApiDestinationRequest; +import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.DeleteApiDestinationResponse; +import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.GetApiDestinationRequest; +import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.GetApiDestinationResponse; +import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.ListApiDestinationsRequest; +import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.ListApiDestinationsResponse; +import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.UpdateApiDestinationRequest; +import org.apache.rocketmq.eventbridge.adapter.api.dto.apidestination.UpdateApiDestinationResponse; +import org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode; +import org.apache.rocketmq.eventbridge.domain.model.PaginationResult; +import org.apache.rocketmq.eventbridge.domain.model.apidestination.ApiDestinationDTO; +import org.apache.rocketmq.eventbridge.domain.model.apidestination.ApiDestinationService; +import org.apache.rocketmq.eventbridge.domain.model.apidestination.parameter.HttpApiParameters; +import org.apache.rocketmq.eventbridge.domain.rpc.AccountAPI; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import reactor.core.publisher.Mono; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; + +@RunWith(MockitoJUnitRunner.class) +public class ApiDestinationDTOControllerTest { + + @InjectMocks + private ApiDestinationController apiDestinationController; + @Mock + private ApiDestinationService apiDestinationService; + @Mock + private Validator validator; + @Mock + private AccountAPI accountAPI; + + @Before + public void testBefore() { + Mockito.when(accountAPI.getResourceOwnerAccountId(any())) + .thenReturn(UUID.randomUUID() + .toString()); + } + + @Test + public void testCreateApiDestination() { + Mockito.when(apiDestinationService.createApiDestination(any())) + .thenReturn(UUID.randomUUID() + .toString()); + CreateApiDestinationRequest createApiDestinationRequest = new CreateApiDestinationRequest(); + createApiDestinationRequest.setApiDestinationName(UUID.randomUUID() + .toString()); + createApiDestinationRequest.setDescription(UUID.randomUUID() + .toString()); + HttpApiParameters httpApiParameters = new HttpApiParameters(); + httpApiParameters.setEndpoint(UUID.randomUUID() + .toString()); + httpApiParameters.setMethod(UUID.randomUUID() + .toString()); + createApiDestinationRequest.setHttpApiParameters(httpApiParameters); + createApiDestinationRequest.setInvocationRateLimitPerSecond(11); + final Mono<CreateApiDestinationResponse> apiDestination = apiDestinationController.createApiDestination( + createApiDestinationRequest); + Assert.assertEquals(apiDestination.block() + .getCode(), EventBridgeErrorCode.Success.getCode()); + } + + @Test + public void testUpdateApiDestination() { + Set<ConstraintViolation<UpdateApiDestinationRequest>> constraintViolations = new HashSet<>(); + Mockito.when(validator.validate(any(UpdateApiDestinationRequest.class))) + .thenReturn(constraintViolations); + Mockito.when(apiDestinationService.updateApiDestination(any())) + .thenReturn(Boolean.TRUE); + UpdateApiDestinationRequest updateApiDestinationRequest = new UpdateApiDestinationRequest(); + updateApiDestinationRequest.setApiDestinationName(UUID.randomUUID() + .toString()); + updateApiDestinationRequest.setDescription(UUID.randomUUID() + .toString()); + HttpApiParameters httpApiParameters = new HttpApiParameters(); + httpApiParameters.setEndpoint(UUID.randomUUID() + .toString()); + httpApiParameters.setMethod(UUID.randomUUID() + .toString()); + updateApiDestinationRequest.setHttpApiParameters(httpApiParameters); + updateApiDestinationRequest.setInvocationRateLimitPerSecond(11); + final Mono<UpdateApiDestinationResponse> updateApiDestinationResponse + = apiDestinationController.updateApiDestination(updateApiDestinationRequest); + Assert.assertEquals(updateApiDestinationResponse.block() + .getCode(), EventBridgeErrorCode.Success.getCode()); + } + + @Test + public void testGetApiDestination() { + Set<ConstraintViolation<GetApiDestinationRequest>> constraintViolations = new HashSet<>(); + Mockito.when(validator.validate(any(GetApiDestinationRequest.class))) + .thenReturn(constraintViolations); + ApiDestinationDTO eventApiDestinationDTO = new ApiDestinationDTO(); + eventApiDestinationDTO.setName(UUID.randomUUID() + .toString()); + eventApiDestinationDTO.setGmtCreate(new Date()); + Mockito.when(apiDestinationService.getApiDestination(any(), any())) + .thenReturn(eventApiDestinationDTO); + GetApiDestinationRequest getApiDestinationRequest = new GetApiDestinationRequest(); + getApiDestinationRequest.setApiDestinationName(UUID.randomUUID() + .toString()); + final Mono<GetApiDestinationResponse> apiDestination = apiDestinationController.getApiDestination( + getApiDestinationRequest); + Assert.assertEquals(apiDestination.block() + .getCode(), EventBridgeErrorCode.Success.getCode()); + } + + @Test + public void testDeleteApiDestination() { + Set<ConstraintViolation<DeleteApiDestinationRequest>> constraintViolations = new HashSet<>(); + Mockito.when(validator.validate(any(DeleteApiDestinationRequest.class))) + .thenReturn(constraintViolations); + Mockito.when(apiDestinationService.deleteApiDestination(any(), any())) + .thenReturn(Boolean.TRUE); + DeleteApiDestinationRequest deleteApiDestinationRequest = new DeleteApiDestinationRequest(); + deleteApiDestinationRequest.setApiDestinationName(UUID.randomUUID() + .toString()); + final Mono<DeleteApiDestinationResponse> deleteApiDestinationResponse + = apiDestinationController.deleteApiDestination(deleteApiDestinationRequest); + Assert.assertEquals(deleteApiDestinationResponse.block() + .getCode(), EventBridgeErrorCode.Success.getCode()); + } + + @Test + public void testListApiDestinations() { + Set<ConstraintViolation<ListApiDestinationsRequest>> constraintViolations = new HashSet<>(); + Mockito.when(validator.validate(any(ListApiDestinationsRequest.class))) + .thenReturn(constraintViolations); + PaginationResult<List<ApiDestinationDTO>> result = new PaginationResult(); + List<ApiDestinationDTO> apiDestinationDTOList = Lists.newArrayList(); + ApiDestinationDTO apiDestinationDTO = new ApiDestinationDTO(); + apiDestinationDTO.setName(UUID.randomUUID() + .toString()); + apiDestinationDTO.setGmtCreate(new Date()); + apiDestinationDTOList.add(apiDestinationDTO); + result.setData(apiDestinationDTOList); + result.setTotal(9); + result.setNextToken("0"); + Mockito.when(apiDestinationService.listApiDestinations(any(), any(), any(), anyInt())) + .thenReturn(result); + ListApiDestinationsRequest listApiDestinationsRequest = new ListApiDestinationsRequest(); + listApiDestinationsRequest.setApiDestinationNamePrefix(UUID.randomUUID() + .toString()); + listApiDestinationsRequest.setNextToken("0"); + listApiDestinationsRequest.setMaxResults(10); + final Mono<ListApiDestinationsResponse> listApiDestinationsResponse + = apiDestinationController.listApiDestinations(listApiDestinationsRequest); + Assert.assertEquals(listApiDestinationsResponse.block() + .getCode(), EventBridgeErrorCode.Success.getCode()); + } +} diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java new file mode 100644 index 0000000..dd2e27f --- /dev/null +++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.api.controller; + +import com.google.common.collect.Lists; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import javax.validation.ConstraintViolation; +import javax.validation.Validator; +import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.CreateConnectionRequest; +import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.CreateConnectionResponse; +import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.DeleteConnectionRequest; +import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.DeleteConnectionResponse; +import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.GetConnectionRequest; +import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.GetConnectionResponse; +import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.ListConnectionRequest; +import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.ListConnectionResponse; +import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.ListEnumsResponse; +import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.UpdateConnectionRequest; +import org.apache.rocketmq.eventbridge.adapter.api.dto.connection.UpdateConnectionResponse; +import org.apache.rocketmq.eventbridge.domain.common.enums.AuthorizationTypeEnum; +import org.apache.rocketmq.eventbridge.domain.common.enums.NetworkTypeEnum; +import org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode; +import org.apache.rocketmq.eventbridge.domain.model.PaginationResult; +import org.apache.rocketmq.eventbridge.domain.model.connection.ConnectionDTO; +import org.apache.rocketmq.eventbridge.domain.model.connection.ConnectionService; +import org.apache.rocketmq.eventbridge.domain.model.connection.parameter.AuthParameters; +import org.apache.rocketmq.eventbridge.domain.model.connection.parameter.BasicAuthParameters; +import org.apache.rocketmq.eventbridge.domain.model.connection.parameter.NetworkParameters; +import org.apache.rocketmq.eventbridge.domain.rpc.AccountAPI; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.BDDMockito; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import reactor.core.publisher.Mono; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; + +@RunWith(MockitoJUnitRunner.class) +public class ConnectionControllerTest { + + @InjectMocks + private ConnectionController connectionController; + @Mock + private ConnectionService connectionService; + @Mock + private Validator validator; + @Mock + private AccountAPI accountAPI; + + @Before + public void testBefore() throws Exception { + Mockito.when(accountAPI.getResourceOwnerAccountId(any())) + .thenReturn(UUID.randomUUID() + .toString()); + } + + @Test + public void testCreateConnection() { + Mockito.when(connectionService.createConnection(any(ConnectionDTO.class))) + .thenReturn(UUID.randomUUID() + .toString()); + Set<ConstraintViolation<CreateConnectionRequest>> constraintViolations = new HashSet<>(); + Mockito.when(validator.validate(any(CreateConnectionRequest.class))) + .thenReturn(constraintViolations); + CreateConnectionRequest createConnectionRequest = new CreateConnectionRequest(); + createConnectionRequest.setConnectionName(UUID.randomUUID() + .toString()); + createConnectionRequest.setDescription(UUID.randomUUID() + .toString()); + NetworkParameters networkParameters = new NetworkParameters(); + networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType()); + networkParameters.setSecurityGroupId(UUID.randomUUID() + .toString()); + networkParameters.setVpcId(UUID.randomUUID() + .toString()); + networkParameters.setVswitcheId(UUID.randomUUID() + .toString()); + createConnectionRequest.setNetworkParameters(networkParameters); + AuthParameters authParameters = new AuthParameters(); + BasicAuthParameters basicAuthParameters = new BasicAuthParameters(); + basicAuthParameters.setPassword(UUID.randomUUID() + .toString()); + basicAuthParameters.setUsername(UUID.randomUUID() + .toString()); + authParameters.setBasicAuthParameters(basicAuthParameters); + authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType()); + createConnectionRequest.setAuthParameters(authParameters); + final Mono<CreateConnectionResponse> connection = connectionController.createConnection( + createConnectionRequest); + Assert.assertEquals(connection.block() + .getCode(), EventBridgeErrorCode.Success.getCode()); + } + + @Test + public void testDeleteConnection() { + Mockito.doNothing() + .when(connectionService) + .deleteConnection(anyString(), anyString()); + Set<ConstraintViolation<DeleteConnectionRequest>> constraintViolations = new HashSet<>(); + Mockito.when(validator.validate(any(DeleteConnectionRequest.class))) + .thenReturn(constraintViolations); + DeleteConnectionRequest deleteConnectionRequest = new DeleteConnectionRequest(); + deleteConnectionRequest.setConnectionName(UUID.randomUUID() + .toString()); + final Mono<DeleteConnectionResponse> deleteConnectionResponse = connectionController.deleteConnection( + deleteConnectionRequest); + Assert.assertEquals(deleteConnectionResponse.block() + .getCode(), EventBridgeErrorCode.Success.getCode()); + } + + @Test + public void testUpdateConnection() { + Mockito.doNothing() + .when(connectionService) + .updateConnection(any(ConnectionDTO.class), anyString()); + Set<ConstraintViolation<UpdateConnectionRequest>> constraintViolations = new HashSet<>(); + Mockito.when(validator.validate(any(UpdateConnectionRequest.class))) + .thenReturn(constraintViolations); + UpdateConnectionRequest updateConnectionRequest = new UpdateConnectionRequest(); + updateConnectionRequest.setConnectionName(UUID.randomUUID() + .toString()); + updateConnectionRequest.setDescription(UUID.randomUUID() + .toString()); + NetworkParameters networkParameters = new NetworkParameters(); + networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType()); + networkParameters.setSecurityGroupId(UUID.randomUUID() + .toString()); + networkParameters.setVpcId(UUID.randomUUID() + .toString()); + networkParameters.setVswitcheId(UUID.randomUUID() + .toString()); + updateConnectionRequest.setNetworkParameters(networkParameters); + AuthParameters authParameters = new AuthParameters(); + BasicAuthParameters basicAuthParameters = new BasicAuthParameters(); + basicAuthParameters.setPassword(UUID.randomUUID() + .toString()); + basicAuthParameters.setUsername(UUID.randomUUID() + .toString()); + authParameters.setBasicAuthParameters(basicAuthParameters); + authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType()); + updateConnectionRequest.setAuthParameters(authParameters); + final Mono<UpdateConnectionResponse> updateConnectionResponse = connectionController.updateConnection( + updateConnectionRequest); + Assert.assertEquals(updateConnectionResponse.block() + .getCode(), EventBridgeErrorCode.Success.getCode()); + } + + @Test + public void testGetConnection() { + Set<ConstraintViolation<GetConnectionRequest>> constraintViolations = new HashSet<>(); + Mockito.when(validator.validate(any(GetConnectionRequest.class))) + .thenReturn(constraintViolations); + final ConnectionDTO connectionDTO = new ConnectionDTO(); + NetworkParameters networkParameters = new NetworkParameters(); + networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType()); + networkParameters.setSecurityGroupId(UUID.randomUUID() + .toString()); + networkParameters.setVpcId(UUID.randomUUID() + .toString()); + networkParameters.setVswitcheId(UUID.randomUUID() + .toString()); + connectionDTO.setNetworkParameters(networkParameters); + connectionDTO.setGmtCreate(new Date()); + List<ConnectionDTO> list = Lists.newArrayList(); + list.add(connectionDTO); + AuthParameters authParameters = new AuthParameters(); + BasicAuthParameters basicAuthParameters = new BasicAuthParameters(); + basicAuthParameters.setPassword(UUID.randomUUID() + .toString()); + basicAuthParameters.setUsername(UUID.randomUUID() + .toString()); + authParameters.setBasicAuthParameters(basicAuthParameters); + authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType()); + connectionDTO.setAuthParameters(authParameters); + BDDMockito.given(connectionService.getConnection(any(), any())) + .willReturn(list); + GetConnectionRequest getConnectionRequest = new GetConnectionRequest(); + getConnectionRequest.setConnectionName(UUID.randomUUID() + .toString()); + final Mono<GetConnectionResponse> getConnectionResponse = connectionController.getConnection( + getConnectionRequest); + Assert.assertEquals(getConnectionResponse.block() + .getCode(), EventBridgeErrorCode.Success.getCode()); + } + + @Test + public void testSelectOneConnection() { + Set<ConstraintViolation<GetConnectionRequest>> constraintViolations = new HashSet<>(); + Mockito.when(validator.validate(any(GetConnectionRequest.class))) + .thenReturn(constraintViolations); + final ConnectionDTO connectionDTO = new ConnectionDTO(); + NetworkParameters networkParameters = new NetworkParameters(); + networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType()); + networkParameters.setSecurityGroupId(UUID.randomUUID() + .toString()); + networkParameters.setVpcId(UUID.randomUUID() + .toString()); + networkParameters.setVswitcheId(UUID.randomUUID() + .toString()); + connectionDTO.setNetworkParameters(networkParameters); + connectionDTO.setGmtCreate(new Date()); + List<ConnectionDTO> list = Lists.newArrayList(); + list.add(connectionDTO); + AuthParameters authParameters = new AuthParameters(); + BasicAuthParameters basicAuthParameters = new BasicAuthParameters(); + basicAuthParameters.setPassword(UUID.randomUUID() + .toString()); + basicAuthParameters.setUsername(UUID.randomUUID() + .toString()); + authParameters.setBasicAuthParameters(basicAuthParameters); + authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType()); + connectionDTO.setAuthParameters(authParameters); + BDDMockito.given(connectionService.getConnection(any(), any())) + .willReturn(list); + GetConnectionRequest getConnectionRequest = new GetConnectionRequest(); + getConnectionRequest.setConnectionName(UUID.randomUUID() + .toString()); + final Mono<GetConnectionResponse> getConnectionResponse = connectionController.selectOneConnection( + getConnectionRequest); + Assert.assertEquals(getConnectionResponse.block() + .getCode(), EventBridgeErrorCode.Success.getCode()); + } + + @Test + public void testListConnections() { + PaginationResult<List<ConnectionDTO>> result = new PaginationResult(); + List<ConnectionDTO> eventConnectionWithBLOBs = Lists.newArrayList(); + ConnectionDTO eventConnection = new ConnectionDTO(); + eventConnection.setConnectionName(UUID.randomUUID() + .toString()); + eventConnection.setGmtCreate(new Date()); + eventConnectionWithBLOBs.add(eventConnection); + result.setData(eventConnectionWithBLOBs); + result.setTotal(9); + result.setNextToken("0"); + Mockito.when(connectionService.listConnections(any(), any(), any(), anyInt())) + .thenReturn(result); + Set<ConstraintViolation<ListConnectionRequest>> constraintViolations = new HashSet<>(); + Mockito.when(validator.validate(any(ListConnectionRequest.class))) + .thenReturn(constraintViolations); + ListConnectionRequest listConnectionRequest = new ListConnectionRequest(); + listConnectionRequest.setConnectionNamePrefix(UUID.randomUUID() + .toString()); + listConnectionRequest.setNextToken("0"); + listConnectionRequest.setMaxResults(10); + final Mono<ListConnectionResponse> listConnections = connectionController.listConnections( + listConnectionRequest); + Assert.assertEquals(listConnections.block() + .getCode(), EventBridgeErrorCode.Success.getCode()); + } + + @Test + public void testListEnumsResponse() { + final Mono<ListEnumsResponse> listEnumsResponse = connectionController.listEnumsResponse(); + Assert.assertEquals(listEnumsResponse.block() + .getNetworkTypeEnums() + .size(), NetworkTypeEnum.values().length); + } +} diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventConverterAdapterTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventConverterAdapterTest.java new file mode 100644 index 0000000..8e617ef --- /dev/null +++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventConverterAdapterTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.api.converter; + +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import io.cloudevents.CloudEvent; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.apache.http.protocol.HTTP.CONTENT_TYPE; + +@RunWith(MockitoJUnitRunner.class) +public class EventConverterAdapterTest { + @InjectMocks + private EventConverterAdapter eventConverterAdapter; + + @Before + public void before() { + eventConverterAdapter.getEventConverterList() + .add(new CloudEventBatchedConverter()); + eventConverterAdapter.getEventConverterList() + .add(new CloudEventBinaryConverter()); + eventConverterAdapter.getEventConverterList() + .add(new CloudEventStructuredConverter()); + } + + @Test + public void toEventsRequest_Binary() { + Map<String, String> headers = Maps.newHashMap(); + headers.put(CONTENT_TYPE, "application/json"); + headers.put("ce-id", UUID.randomUUID() + .toString()); + headers.put("ce-source", "demo-source"); + headers.put("ce-type", URI.create("demo:type") + .toString()); + headers.put("ce-specversion", "1.0"); + byte[] body = new String("{\n" + "\t\"a\":1,\n" + "\t\"b\":2\n" + "}").getBytes(StandardCharsets.UTF_8); + List<CloudEvent> cloudEventList = eventConverterAdapter.toEventsRequest(headers, body); + Assert.assertEquals(1, cloudEventList.size()); + Assert.assertEquals("demo:type", cloudEventList.get(0) + .getType()); + } + + @Test + public void toEventsRequest_Structured() { + Map<String, String> headers = Maps.newHashMap(); + headers.put(CONTENT_TYPE, "application/cloudevents+json"); + + Map<String, Object> cloudEvent = Maps.newHashMap(); + cloudEvent.put("id", UUID.randomUUID() + .toString()); + cloudEvent.put("source", "demo-source"); + cloudEvent.put("type", URI.create("demo:type")); + cloudEvent.put("specversion", "1.0"); + cloudEvent.put("data", "{\n" + "\t\"a\":1,\n" + "\t\"b\":2\n" + "}"); + List<CloudEvent> cloudEventList = eventConverterAdapter.toEventsRequest(headers, new Gson().toJson(cloudEvent) + .getBytes(StandardCharsets.UTF_8)); + + Assert.assertEquals(1, cloudEventList.size()); + Assert.assertEquals("demo:type", cloudEventList.get(0) + .getType()); + } + + @Test + public void toEventsRequest_Batched() { + Map<String, String> headers = Maps.newHashMap(); + headers.put(CONTENT_TYPE, "application/cloudevents-batch+json"); + + Map<String, Object> cloudEvent1 = Maps.newHashMap(); + cloudEvent1.put("id", UUID.randomUUID() + .toString()); + cloudEvent1.put("source", "demo-source"); + cloudEvent1.put("type", URI.create("demo:type")); + cloudEvent1.put("specversion", "1.0"); + cloudEvent1.put("data", "{\n" + "\t\"a\":1,\n" + "\t\"b\":2\n" + "}"); + + Map<String, Object> cloudEvent2 = Maps.newHashMap(); + cloudEvent2.put("id", UUID.randomUUID() + .toString()); + cloudEvent2.put("source", "demo-source"); + cloudEvent2.put("type", URI.create("demo:type")); + cloudEvent2.put("specversion", "1.0"); + cloudEvent2.put("data", "{\n" + "\t\"a\":1,\n" + "\t\"b\":2\n" + "}"); + + List<CloudEvent> cloudEventList = eventConverterAdapter.toEventsRequest(headers, + new Gson().toJson(Arrays.asList(cloudEvent1, cloudEvent2)) + .getBytes(StandardCharsets.UTF_8)); + + Assert.assertEquals(2, cloudEventList.size()); + Assert.assertEquals("demo:type", cloudEventList.get(0) + .getType()); + } + +} \ No newline at end of file diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java new file mode 100644 index 0000000..3b1b45c --- /dev/null +++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetConverterTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.eventbridge.adapter.api.converter; + +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.rocketmq.eventbridge.adapter.api.dto.target.DeadLetterQueueDTO; +import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO; +import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RetryStrategyDTO; +import org.apache.rocketmq.eventbridge.adapter.api.dto.target.RunOptionsDTO; +import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum; +import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum; +import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue; +import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy; +import org.apache.rocketmq.eventbridge.domain.model.target.EventTarget; +import org.junit.Assert; +import org.junit.Test; + +public class EventTargetConverterTest { + + @Test + public void convertEventTargetRunners() { + + } + + @Test + public void convertEventTargetRunner() { + EventTargetDTO eventTargetDTO = new EventTargetDTO(); + eventTargetDTO.setEventTargetName("targetName"); + Map<String, Object> config = Maps.newHashMap(); + config.put("url", "http://127.0.0.1:7001/cloudevents"); + eventTargetDTO.setConfig(config); + eventTargetDTO.setClassName("http"); + + RunOptionsDTO runOptionsDTO = new RunOptionsDTO(); + runOptionsDTO.setErrorsTolerance("NONE"); + runOptionsDTO.setDeadLetterQueue(this.buildDeadLetterQueueDTO()); + runOptionsDTO.setRetryStrategy(this.buildRetryStrategyDTO()); + eventTargetDTO.setRunOptions(runOptionsDTO); + + EventTarget eventTarget = EventTargetConverter.convertEventTarget("123456", "bus", "rule", eventTargetDTO); + + Assert.assertEquals(eventTarget.getAccountId(), "123456"); + Assert.assertEquals(eventTarget.getEventBusName(), "bus"); + Assert.assertEquals(eventTarget.getEventRuleName(), "rule"); + Assert.assertEquals(eventTarget.getName(), eventTargetDTO.getEventTargetName()); + Assert.assertEquals(eventTarget.getClassName(), eventTargetDTO.getClassName()); + Assert.assertEquals(eventTarget.getConfig(), eventTargetDTO.getConfig()); + } + + @Test + public void convertRetryStrategy() { + RetryStrategyDTO retryStrategyDTO = buildRetryStrategyDTO(); + RetryStrategy retryStrategy = EventTargetConverter.convertRetryStrategy(retryStrategyDTO); + + Assert.assertEquals(PushRetryStrategyEnum.BACKOFF_RETRY, retryStrategy.getPushRetryStrategy()); + Assert.assertEquals(retryStrategyDTO.getMaximumRetryAttempts(), retryStrategy.getMaximumRetryAttempts()); + Assert.assertEquals(retryStrategyDTO.getMaximumEventAgeInSeconds(), + retryStrategy.getMaximumEventAgeInSeconds()); + } + + @Test + public void convertErrorTolerance() { + ErrorToleranceEnum errorToleranceEnum = EventTargetConverter.convertErrorTolerance("NONE"); + Assert.assertEquals(ErrorToleranceEnum.NONE, errorToleranceEnum); + } + + @Test + public void convertDeadLetterQueue() { + DeadLetterQueueDTO deadLetterQueueDTO = this.buildDeadLetterQueueDTO(); + DeadLetterQueue deadLetterQueue = EventTargetConverter.convertDeadLetterQueue(deadLetterQueueDTO); + Assert.assertEquals(deadLetterQueue.getType(), deadLetterQueueDTO.getType()); + Assert.assertEquals(deadLetterQueue.getConfig(), deadLetterQueueDTO.getConfig()); + } + + private RetryStrategyDTO buildRetryStrategyDTO() { + RetryStrategyDTO retryStrategyDTO = new RetryStrategyDTO(); + retryStrategyDTO.setPushRetryStrategy("BACKOFF_RETRY"); + retryStrategyDTO.setMaximumRetryAttempts(3); + retryStrategyDTO.setMaximumEventAgeInSeconds(4); + return retryStrategyDTO; + } + + private DeadLetterQueueDTO buildDeadLetterQueueDTO() { + DeadLetterQueueDTO deadLetterQueueDTO = new DeadLetterQueueDTO(); + deadLetterQueueDTO.setType("rocketmq"); + Map<String, Object> config = Maps.newHashMap(); + config.put("topic", "demo"); + config.put("nameSrv", "127.0.01:9876"); + deadLetterQueueDTO.setConfig(config); + return deadLetterQueueDTO; + } +} \ No newline at end of file diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java new file mode 100644 index 0000000..79445e6 --- /dev/null +++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/converter/EventTargetDTOConverterTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.api.converter; + +import java.util.Collections; +import org.apache.rocketmq.eventbridge.adapter.api.dto.target.EventTargetDTO; +import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum; +import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum; +import org.apache.rocketmq.eventbridge.domain.model.run.DeadLetterQueue; +import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy; +import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions; +import org.apache.rocketmq.eventbridge.domain.model.target.EventTarget; +import org.junit.Assert; +import org.junit.Test; + +public class EventTargetDTOConverterTest { + + @Test + public void convert() { + RetryStrategy retryStrategy = RetryStrategy.builder() + .pushRetryStrategy(PushRetryStrategyEnum.BACKOFF_RETRY) + .maximumRetryAttempts(3) + .maximumEventAgeInSeconds(4) + .build(); + DeadLetterQueue deadLetterQueue = DeadLetterQueue.builder() + .type("rocketmq") + .config(Collections.singletonMap("topic", "demo")) + .build(); + RunOptions runOptions = RunOptions.builder() + .errorsTolerance(ErrorToleranceEnum.ALL) + .retryStrategy(retryStrategy) + .deadLetterQueue(deadLetterQueue) + .build(); + EventTarget eventTarget = EventTarget.builder() + .eventBusName("bus") + .eventRuleName("rule") + .name("target") + .config(Collections.singletonMap("url", "http://127.0.0.1:7002/cloudevent")) + .className("http") + .runOptions(runOptions) + .build(); + + EventTargetDTO eventTargetDTO = EventTargetDTOConverter.convert(eventTarget); + Assert.assertEquals(eventTarget.getName(), eventTargetDTO.getEventTargetName()); + Assert.assertEquals(eventTarget.getClassName(), eventTargetDTO.getClassName()); + Assert.assertEquals(eventTarget.getConfig(), eventTargetDTO.getConfig()); + + Assert.assertEquals(eventTarget.getRunOptions() + .getErrorsTolerance() + .toString(), eventTargetDTO.getRunOptions() + .getErrorsTolerance()); + + Assert.assertEquals(eventTarget.getRunOptions() + .getRetryStrategy() + .getPushRetryStrategy() + .toString(), eventTargetDTO.getRunOptions() + .getRetryStrategy() + .getPushRetryStrategy()); + Assert.assertEquals(eventTarget.getRunOptions() + .getRetryStrategy() + .getMaximumRetryAttempts(), eventTargetDTO.getRunOptions() + .getRetryStrategy() + .getMaximumRetryAttempts()); + Assert.assertEquals(eventTarget.getRunOptions() + .getRetryStrategy() + .getMaximumEventAgeInSeconds(), eventTargetDTO.getRunOptions() + .getRetryStrategy() + .getMaximumEventAgeInSeconds()); + + Assert.assertEquals(eventTarget.getRunOptions() + .getDeadLetterQueue() + .getType(), eventTargetDTO.getRunOptions() + .getDeadLetterQueue() + .getType()); + Assert.assertEquals(eventTarget.getRunOptions() + .getDeadLetterQueue() + .getConfig(), eventTargetDTO.getRunOptions() + .getDeadLetterQueue() + .getConfig()); + } +} \ No newline at end of file diff --git a/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/handler/EventDataHandlerTest.java b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/handler/EventDataHandlerTest.java new file mode 100644 index 0000000..fe5b718 --- /dev/null +++ b/test/demo/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/handler/EventDataHandlerTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.api.handler; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import lombok.SneakyThrows; +import org.apache.rocketmq.eventbridge.adapter.api.dto.data.PutEventsResponse; +import org.apache.rocketmq.eventbridge.domain.model.data.EventDataService; +import org.apache.rocketmq.eventbridge.domain.model.data.PutEventCallback; +import org.apache.rocketmq.eventbridge.domain.model.data.PutEventsResponseEntry; +import org.apache.rocketmq.eventbridge.event.EventBridgeEvent; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import reactor.core.publisher.Mono; + +import static org.mockito.ArgumentMatchers.any; + +@RunWith(MockitoJUnitRunner.class) +public class EventDataHandlerTest { + @InjectMocks + private EventDataHandler eventDataHandler; + + @Mock + EventDataService eventDataService; + + ExecutorService executor = Executors.newFixedThreadPool(10); + + @Before + public void before() { + Mockito.doAnswer((invocation) -> { + Object[] args = invocation.getArguments(); + EventBridgeEvent event = (EventBridgeEvent) args[1]; + ReactorPutEventCallback callback = (ReactorPutEventCallback) args[2]; + executor.submit(new PutEventTestThread(event, callback)); + return null; + }) + .when(eventDataService) + .putEvent(any(), any(), any()); + } + + @Test + public void testPutEvents() { + Long startTime = System.currentTimeMillis(); + List<EventBridgeEvent> eventList = IntStream.range(0, 10) + .mapToObj(index -> { + EventBridgeEvent event = new EventBridgeEvent(); + event.setId(UUID.randomUUID() + .toString()); + return event; + }) + .collect(Collectors.toList()); + Mono<PutEventsResponse> mono = eventDataHandler.putEvents("123456", eventList); + PutEventsResponse putEventsResponse = mono.block(); + Long costTime = System.currentTimeMillis() - startTime; + Assert.assertEquals(10, putEventsResponse.getEntryList() + .size()); + System.out.println("costTime:" + costTime); + Assert.assertEquals(true, costTime < 4000); + } + + class PutEventTestThread implements Runnable { + EventBridgeEvent event; + PutEventCallback putEventCallback; + + public PutEventTestThread(EventBridgeEvent event, PutEventCallback putEventCallback) { + this.event = event; + this.putEventCallback = putEventCallback; + } + + @SneakyThrows + @Override + public void run() { + Thread.sleep(3000L); + PutEventsResponseEntry putEventsResponseEntry = new PutEventsResponseEntry(); + putEventsResponseEntry.setEventId(event.getId()); + if (System.currentTimeMillis() % 2 == 1) { + putEventsResponseEntry.setErrorCode("Success"); + } else { + putEventsResponseEntry.setErrorCode("Failed."); + } + + putEventCallback.endProcess(putEventsResponseEntry); + } + } + +} diff --git a/test/pom.xml b/test/pom.xml new file mode 100644 index 0000000..12f4a80 --- /dev/null +++ b/test/pom.xml @@ -0,0 +1,28 @@ +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>rocketmq-eventbridge</artifactId> + <groupId>org.apache.rocketmq</groupId> + <version>1.0.0</version> + <relativePath>../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>rocketmq-eventbridge-test</artifactId> + <packaging>pom</packaging> + + <modules> + <module>demo</module> + </modules> + +</project> \ No newline at end of file
