This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/main by this push:
new db61a46 feat:Modify API to Mono structure (#27)
db61a46 is described below
commit db61a464321c507ce8eb960029fbaf0445e22fa5
Author: shenlin <[email protected]>
AuthorDate: Tue Sep 6 10:31:53 2022 +0800
feat:Modify API to Mono structure (#27)
feat:Modify API to Mono structure
---
.../api/controller/ApiDestinationController.java | 162 ++++++++++++-------
.../api/controller/ConnectionController.java | 176 +++++++++++++--------
.../adapter/api/controller/EventBusController.java | 65 +++++---
.../api/controller/EventDataController.java | 29 ++--
.../api/controller/EventRuleController.java | 139 +++++++++-------
.../api/controller/EventSourceController.java | 120 ++++++++------
.../api/controller/EventTargetController.java | 74 +++++----
.../api/controller/EventTypeController.java | 42 ++---
.../ApiDestinationDTOControllerTest.java | 102 ++++++++----
.../api/controller/ConnectionControllerTest.java | 143 +++++++++++------
...acs_mns.sql => V4__register_source_acs_mns.sql} | 0
...sql => V6__register_target_acs_eventbridge.sql} | 0
adapter/rpc/pom.xml | 6 +
.../adapter/rpc/impl/AccountAPIImpl.java | 20 +--
.../exception/code/DefaultErrorCode.java | 1 +
.../eventbridge/domain/rpc/AccountAPI.java | 31 +++-
.../rocketmq/eventbridge/filter/LoginFilter.java | 35 +++-
17 files changed, 728 insertions(+), 417 deletions(-)
diff --git
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationController.java
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationController.java
index a5d32a9..f18e6db 100644
---
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationController.java
+++
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationController.java
@@ -41,6 +41,8 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
+import reactor.util.context.Context;
import javax.annotation.Resource;
import javax.validation.ConstraintViolation;
@@ -62,83 +64,137 @@ public class ApiDestinationController {
@WebLog
@PostMapping("createApiDestination")
- public CreateApiDestinationResponse createApiDestination(@RequestBody
CreateApiDestinationRequest createApiDestinationRequest) {
- final Set<ConstraintViolation<CreateApiDestinationRequest>> validate =
validator.validate(createApiDestinationRequest);
- List<String> errMessage =
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
- if (!CollectionUtils.isEmpty(errMessage)) {
- return new
CreateApiDestinationResponse(null).parameterCheckFailRes(errMessage.toString());
- }
- ApiDestinationDTO apiDestinationDTO =
getEventApiDestination(createApiDestinationRequest.getHttpApiParameters(),
createApiDestinationRequest.getDescription(),
createApiDestinationRequest.getConnectionName(),
createApiDestinationRequest.getInvocationRateLimitPerSecond(),
createApiDestinationRequest.getApiDestinationName(), accountAPI);
- return new
CreateApiDestinationResponse(apiDestinationService.createApiDestination(apiDestinationDTO)).success();
+ public Mono<CreateApiDestinationResponse> createApiDestination(
+ @RequestBody CreateApiDestinationRequest createApiDestinationRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ final Set<ConstraintViolation<CreateApiDestinationRequest>>
validate = validator.validate(
+ createApiDestinationRequest);
+ List<String> errMessage = validate.stream()
+ .map(ConstraintViolation::getMessage)
+ .collect(Collectors.toList());
+ if (!CollectionUtils.isEmpty(errMessage)) {
+ return new
CreateApiDestinationResponse(null).parameterCheckFailRes(errMessage.toString());
+ }
+ ApiDestinationDTO apiDestinationDTO = getEventApiDestination(
+ createApiDestinationRequest.getHttpApiParameters(),
createApiDestinationRequest.getDescription(),
+ createApiDestinationRequest.getConnectionName(),
+
createApiDestinationRequest.getInvocationRateLimitPerSecond(),
+ createApiDestinationRequest.getApiDestinationName(),
accountAPI, ctx);
+ return new CreateApiDestinationResponse(
+
apiDestinationService.createApiDestination(apiDestinationDTO)).success();
+ });
}
@WebLog
@PostMapping("updateApiDestination")
- public UpdateApiDestinationResponse updateApiDestination(@RequestBody
UpdateApiDestinationRequest updateApiDestinationRequest) {
- final Set<ConstraintViolation<UpdateApiDestinationRequest>> validate =
validator.validate(updateApiDestinationRequest);
- List<String> errMessage =
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
- if (!CollectionUtils.isEmpty(errMessage)) {
- return new
UpdateApiDestinationResponse().parameterCheckFailRes(errMessage.toString());
- }
- ApiDestinationDTO apiDestinationDTO =
getEventApiDestination(updateApiDestinationRequest.getHttpApiParameters(),
updateApiDestinationRequest.getDescription(),
updateApiDestinationRequest.getConnectionName(),
updateApiDestinationRequest.getInvocationRateLimitPerSecond(),
updateApiDestinationRequest.getApiDestinationName(), accountAPI);
- apiDestinationService.updateApiDestination(apiDestinationDTO);
- return new UpdateApiDestinationResponse().success();
+ public Mono<UpdateApiDestinationResponse> updateApiDestination(
+ @RequestBody UpdateApiDestinationRequest updateApiDestinationRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ final Set<ConstraintViolation<UpdateApiDestinationRequest>>
validate = validator.validate(
+ updateApiDestinationRequest);
+ List<String> errMessage = validate.stream()
+ .map(ConstraintViolation::getMessage)
+ .collect(Collectors.toList());
+ if (!CollectionUtils.isEmpty(errMessage)) {
+ return new
UpdateApiDestinationResponse().parameterCheckFailRes(errMessage.toString());
+ }
+ ApiDestinationDTO apiDestinationDTO = getEventApiDestination(
+ updateApiDestinationRequest.getHttpApiParameters(),
updateApiDestinationRequest.getDescription(),
+ updateApiDestinationRequest.getConnectionName(),
+
updateApiDestinationRequest.getInvocationRateLimitPerSecond(),
+ updateApiDestinationRequest.getApiDestinationName(),
accountAPI, ctx);
+ apiDestinationService.updateApiDestination(apiDestinationDTO);
+ return new UpdateApiDestinationResponse().success();
+ });
}
@WebLog
@PostMapping("getApiDestination")
- public GetApiDestinationResponse getApiDestination(@RequestBody
GetApiDestinationRequest getApiDestinationRequest) {
- final Set<ConstraintViolation<GetApiDestinationRequest>> validate =
validator.validate(getApiDestinationRequest);
- List<String> errMessage =
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
- if (!CollectionUtils.isEmpty(errMessage)) {
- return new GetApiDestinationResponse(null, null, null, null,
null).parameterCheckFailRes(errMessage.toString());
- }
- final ApiDestinationDTO apiDestinationDTO =
apiDestinationService.getApiDestination(accountAPI.getResourceOwnerAccountId(),
getApiDestinationRequest.getApiDestinationName());
- return new GetApiDestinationResponse(apiDestinationDTO.getName(),
apiDestinationDTO.getConnectionName(), apiDestinationDTO.getDescription(),
apiDestinationDTO.getApiParams(),
apiDestinationDTO.getInvocationRateLimitPerSecond()).success();
+ public Mono<GetApiDestinationResponse> getApiDestination(
+ @RequestBody GetApiDestinationRequest getApiDestinationRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ final Set<ConstraintViolation<GetApiDestinationRequest>>
validate = validator.validate(
+ getApiDestinationRequest);
+ List<String> errMessage = validate.stream()
+ .map(ConstraintViolation::getMessage)
+ .collect(Collectors.toList());
+ if (!CollectionUtils.isEmpty(errMessage)) {
+ return new GetApiDestinationResponse(null, null, null,
null, null).parameterCheckFailRes(
+ errMessage.toString());
+ }
+ final ApiDestinationDTO apiDestinationDTO =
apiDestinationService.getApiDestination(
+ accountAPI.getResourceOwnerAccountId(ctx),
getApiDestinationRequest.getApiDestinationName());
+ return new
GetApiDestinationResponse(apiDestinationDTO.getName(),
apiDestinationDTO.getConnectionName(),
+ apiDestinationDTO.getDescription(),
apiDestinationDTO.getApiParams(),
+
apiDestinationDTO.getInvocationRateLimitPerSecond()).success();
+ });
}
@WebLog
@PostMapping("deleteApiDestination")
- public DeleteApiDestinationResponse deleteApiDestination(@RequestBody
DeleteApiDestinationRequest deleteApiDestinationRequest) {
- final Set<ConstraintViolation<DeleteApiDestinationRequest>> validate =
validator.validate(deleteApiDestinationRequest);
- List<String> errMessage =
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
- if (!CollectionUtils.isEmpty(errMessage)) {
- return new
DeleteApiDestinationResponse().parameterCheckFailRes(errMessage.toString());
- }
-
apiDestinationService.deleteApiDestination(accountAPI.getResourceOwnerAccountId(),
deleteApiDestinationRequest.getApiDestinationName());
- return new DeleteApiDestinationResponse().success();
+ public Mono<DeleteApiDestinationResponse> deleteApiDestination(
+ @RequestBody DeleteApiDestinationRequest deleteApiDestinationRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ final Set<ConstraintViolation<DeleteApiDestinationRequest>>
validate = validator.validate(
+ deleteApiDestinationRequest);
+ List<String> errMessage = validate.stream()
+ .map(ConstraintViolation::getMessage)
+ .collect(Collectors.toList());
+ if (!CollectionUtils.isEmpty(errMessage)) {
+ return new
DeleteApiDestinationResponse().parameterCheckFailRes(errMessage.toString());
+ }
+
apiDestinationService.deleteApiDestination(accountAPI.getResourceOwnerAccountId(ctx),
+ deleteApiDestinationRequest.getApiDestinationName());
+ return new DeleteApiDestinationResponse().success();
+ });
}
@WebLog
@PostMapping("listApiDestinations")
- public ListApiDestinationsResponse listApiDestinations(@RequestBody
ListApiDestinationsRequest listApiDestinationsRequest) {
- final Set<ConstraintViolation<ListApiDestinationsRequest>> validate =
validator.validate(listApiDestinationsRequest);
- List<String> errMessage =
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
- if (!CollectionUtils.isEmpty(errMessage)) {
- return new ListApiDestinationsResponse(null, null, null,
0).parameterCheckFailRes(errMessage.toString());
- }
- final PaginationResult<List<ApiDestinationDTO>> listPaginationResult =
apiDestinationService.listApiDestinations(accountAPI.getResourceOwnerAccountId(),
- listApiDestinationsRequest.getApiDestinationNamePrefix(),
listApiDestinationsRequest.getNextToken(),
listApiDestinationsRequest.getMaxResults());
- List<ApiDestinationsResponse> apiDestinationsResponses =
Lists.newArrayList();
- listPaginationResult.getData()
- .forEach(eventApiDestination -> {
- ApiDestinationsResponse apiDestinationsResponse = new
ApiDestinationsResponse();
- BeanUtils.copyProperties(eventApiDestination,
apiDestinationsResponse);
-
apiDestinationsResponse.setApiDestinationName(eventApiDestination.getName());
-
apiDestinationsResponse.setHttpApiParameters(eventApiDestination.getApiParams());
- apiDestinationsResponses.add(apiDestinationsResponse);
- });
- return new ListApiDestinationsResponse(apiDestinationsResponses,
listPaginationResult.getNextToken(), listPaginationResult.getTotal(),
listApiDestinationsRequest.getMaxResults()).success();
+ public Mono<ListApiDestinationsResponse> listApiDestinations(
+ @RequestBody ListApiDestinationsRequest listApiDestinationsRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ final Set<ConstraintViolation<ListApiDestinationsRequest>>
validate = validator.validate(
+ listApiDestinationsRequest);
+ List<String> errMessage = validate.stream()
+ .map(ConstraintViolation::getMessage)
+ .collect(Collectors.toList());
+ if (!CollectionUtils.isEmpty(errMessage)) {
+ return new ListApiDestinationsResponse(null, null, null,
0).parameterCheckFailRes(
+ errMessage.toString());
+ }
+ final PaginationResult<List<ApiDestinationDTO>>
listPaginationResult
+ =
apiDestinationService.listApiDestinations(accountAPI.getResourceOwnerAccountId(ctx),
+ listApiDestinationsRequest.getApiDestinationNamePrefix(),
listApiDestinationsRequest.getNextToken(),
+ listApiDestinationsRequest.getMaxResults());
+ List<ApiDestinationsResponse> apiDestinationsResponses =
Lists.newArrayList();
+ listPaginationResult.getData()
+ .forEach(eventApiDestination -> {
+ ApiDestinationsResponse apiDestinationsResponse = new
ApiDestinationsResponse();
+ BeanUtils.copyProperties(eventApiDestination,
apiDestinationsResponse);
+
apiDestinationsResponse.setApiDestinationName(eventApiDestination.getName());
+
apiDestinationsResponse.setHttpApiParameters(eventApiDestination.getApiParams());
+ apiDestinationsResponses.add(apiDestinationsResponse);
+ });
+ return new
ListApiDestinationsResponse(apiDestinationsResponses,
listPaginationResult.getNextToken(),
+ listPaginationResult.getTotal(),
listApiDestinationsRequest.getMaxResults()).success();
+ });
}
- private ApiDestinationDTO getEventApiDestination(HttpApiParameters
apiParams, String description, String connectionName, Integer
invocationRateLimitPerSecond, String name, AccountAPI accountAPI) {
+ private ApiDestinationDTO getEventApiDestination(HttpApiParameters
apiParams, String description,
+ String connectionName, Integer invocationRateLimitPerSecond, String
name, AccountAPI accountAPI, Context ctx) {
ApiDestinationDTO apiDestinationDTO = new ApiDestinationDTO();
apiDestinationDTO.setApiParams(apiParams);
apiDestinationDTO.setDescription(description);
apiDestinationDTO.setConnectionName(connectionName);
apiDestinationDTO.setInvocationRateLimitPerSecond(invocationRateLimitPerSecond);
apiDestinationDTO.setName(name);
- apiDestinationDTO.setAccountId(accountAPI.getResourceOwnerAccountId());
+
apiDestinationDTO.setAccountId(accountAPI.getResourceOwnerAccountId(ctx));
return apiDestinationDTO;
}
}
diff --git
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionController.java
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionController.java
index 357b2df..63be555 100644
---
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionController.java
+++
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionController.java
@@ -17,6 +17,14 @@
package org.apache.rocketmq.eventbridge.adapter.api.controller;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Resource;
+import javax.validation.ConstraintViolation;
+import javax.validation.Validator;
+
import com.google.common.collect.Lists;
import org.apache.rocketmq.eventbridge.adapter.api.annotations.WebLog;
import org.apache.rocketmq.eventbridge.adapter.api.dto.BaseRequest;
@@ -35,8 +43,8 @@ import
org.apache.rocketmq.eventbridge.adapter.api.dto.connection.UpdateConnecti
import
org.apache.rocketmq.eventbridge.domain.common.enums.AuthorizationTypeEnum;
import org.apache.rocketmq.eventbridge.domain.common.enums.NetworkTypeEnum;
import org.apache.rocketmq.eventbridge.domain.model.PaginationResult;
-import
org.apache.rocketmq.eventbridge.domain.model.connection.ConnectionService;
import org.apache.rocketmq.eventbridge.domain.model.connection.ConnectionDTO;
+import
org.apache.rocketmq.eventbridge.domain.model.connection.ConnectionService;
import org.apache.rocketmq.eventbridge.domain.rpc.AccountAPI;
import org.springframework.beans.BeanUtils;
import org.springframework.util.CollectionUtils;
@@ -44,14 +52,8 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
-
-import javax.annotation.Resource;
-import javax.validation.ConstraintViolation;
-import javax.validation.Validator;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
+import reactor.core.publisher.Mono;
+import reactor.util.context.Context;
@RestController
@RequestMapping("/connection/")
@@ -66,85 +68,129 @@ public class ConnectionController {
@WebLog
@PostMapping("createConnection")
- public CreateConnectionResponse createConnection(@RequestBody
CreateConnectionRequest createConnectionRequest) {
- final Set<ConstraintViolation<CreateConnectionRequest>> validate =
validator.validate(createConnectionRequest);
- List<String> errMessage =
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
- if (!CollectionUtils.isEmpty(errMessage)) {
- return new
CreateConnectionResponse(null).parameterCheckFailRes(errMessage.toString());
- }
- ConnectionDTO connectionDTO =
getEventConnectionWithBLOBs(createConnectionRequest);
- return new
CreateConnectionResponse(connectionService.createConnection(connectionDTO)).success();
+ public Mono<CreateConnectionResponse> createConnection(
+ @RequestBody CreateConnectionRequest createConnectionRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ final Set<ConstraintViolation<CreateConnectionRequest>>
validate = validator.validate(
+ createConnectionRequest);
+ List<String> errMessage = validate.stream()
+ .map(ConstraintViolation::getMessage)
+ .collect(Collectors.toList());
+ if (!CollectionUtils.isEmpty(errMessage)) {
+ return new
CreateConnectionResponse(null).parameterCheckFailRes(errMessage.toString());
+ }
+ ConnectionDTO connectionDTO = getEventConnectionWithBLOBs(ctx,
createConnectionRequest);
+ return new
CreateConnectionResponse(connectionService.createConnection(connectionDTO)).success();
+ });
}
@WebLog
@PostMapping("deleteConnection")
- public DeleteConnectionResponse deleteConnection(@RequestBody
DeleteConnectionRequest deleteConnectionRequest) {
- final Set<ConstraintViolation<DeleteConnectionRequest>> validate =
validator.validate(deleteConnectionRequest);
- List<String> errMessage =
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
- if (!CollectionUtils.isEmpty(errMessage)) {
- return new
DeleteConnectionResponse().parameterCheckFailRes(errMessage.toString());
- }
-
connectionService.deleteConnection(accountAPI.getResourceOwnerAccountId(),
deleteConnectionRequest.getConnectionName());
- return new DeleteConnectionResponse().success();
+ public Mono<DeleteConnectionResponse> deleteConnection(
+ @RequestBody DeleteConnectionRequest deleteConnectionRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ final Set<ConstraintViolation<DeleteConnectionRequest>>
validate = validator.validate(
+ deleteConnectionRequest);
+ List<String> errMessage = validate.stream()
+ .map(ConstraintViolation::getMessage)
+ .collect(Collectors.toList());
+ if (!CollectionUtils.isEmpty(errMessage)) {
+ return new
DeleteConnectionResponse().parameterCheckFailRes(errMessage.toString());
+ }
+
connectionService.deleteConnection(accountAPI.getResourceOwnerAccountId(ctx),
+ deleteConnectionRequest.getConnectionName());
+ return new DeleteConnectionResponse().success();
+ });
}
@WebLog
@PostMapping("updateConnection")
- public UpdateConnectionResponse updateConnection(@RequestBody
UpdateConnectionRequest updateConnectionRequest) {
- final Set<ConstraintViolation<UpdateConnectionRequest>> validate =
validator.validate(updateConnectionRequest);
- List<String> errMessage =
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
- if (!CollectionUtils.isEmpty(errMessage)) {
- return new
UpdateConnectionResponse().parameterCheckFailRes(errMessage.toString());
- }
- ConnectionDTO connectionDTO =
getEventConnectionWithBLOBs(updateConnectionRequest);
- connectionService.updateConnection(connectionDTO,
accountAPI.getResourceOwnerAccountId());
- return new UpdateConnectionResponse().success();
+ public Mono<UpdateConnectionResponse> updateConnection(
+ @RequestBody UpdateConnectionRequest updateConnectionRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ final Set<ConstraintViolation<UpdateConnectionRequest>>
validate = validator.validate(
+ updateConnectionRequest);
+ List<String> errMessage = validate.stream()
+ .map(ConstraintViolation::getMessage)
+ .collect(Collectors.toList());
+ if (!CollectionUtils.isEmpty(errMessage)) {
+ return new
UpdateConnectionResponse().parameterCheckFailRes(errMessage.toString());
+ }
+ ConnectionDTO connectionDTO = getEventConnectionWithBLOBs(ctx,
updateConnectionRequest);
+ connectionService.updateConnection(connectionDTO,
accountAPI.getResourceOwnerAccountId(ctx));
+ return new UpdateConnectionResponse().success();
+ });
}
@WebLog
@PostMapping("getConnection")
- public GetConnectionResponse getConnection(@RequestBody
GetConnectionRequest getConnectionRequest) {
- final Set<ConstraintViolation<GetConnectionRequest>> validate =
validator.validate(getConnectionRequest);
- List<String> errMessage =
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
- if (!CollectionUtils.isEmpty(errMessage)) {
- return new GetConnectionResponse(null, null, null,
null).parameterCheckFailRes(errMessage.toString());
- }
- final ConnectionDTO connectionDTO =
connectionService.getConnection(accountAPI.getResourceOwnerAccountId(),
getConnectionRequest.getConnectionName());
- return new GetConnectionResponse(connectionDTO.getConnectionName(),
connectionDTO.getDescription(), connectionDTO.getNetworkParameters(),
connectionDTO.getAuthParameters()).success();
+ public Mono<GetConnectionResponse> getConnection(@RequestBody
GetConnectionRequest getConnectionRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ final Set<ConstraintViolation<GetConnectionRequest>> validate
= validator.validate(
+ getConnectionRequest);
+ List<String> errMessage = validate.stream()
+ .map(ConstraintViolation::getMessage)
+ .collect(Collectors.toList());
+ if (!CollectionUtils.isEmpty(errMessage)) {
+ return new GetConnectionResponse(null, null, null,
null).parameterCheckFailRes(
+ errMessage.toString());
+ }
+ final ConnectionDTO connectionDTO =
connectionService.getConnection(
+ accountAPI.getResourceOwnerAccountId(ctx),
getConnectionRequest.getConnectionName());
+ return new
GetConnectionResponse(connectionDTO.getConnectionName(),
connectionDTO.getDescription(),
+ connectionDTO.getNetworkParameters(),
connectionDTO.getAuthParameters()).success();
+ });
}
@WebLog
@PostMapping("listConnections")
- public ListConnectionResponse listConnections(@RequestBody
ListConnectionRequest listConnectionRequest) {
- final Set<ConstraintViolation<ListConnectionRequest>> validate =
validator.validate(listConnectionRequest);
- List<String> errMessage =
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
- if (!CollectionUtils.isEmpty(errMessage)) {
- return new ListConnectionResponse(null, null, null,
0).parameterCheckFailRes(errMessage.toString());
- }
- final PaginationResult<List<ConnectionDTO>> listPaginationResult =
connectionService.listConnections(accountAPI.getResourceOwnerAccountId(),
- listConnectionRequest.getConnectionNamePrefix(),
listConnectionRequest.getNextToken(), listConnectionRequest.getMaxResults());
- List<ConnectionResponse> connectionResponses = Lists.newArrayList();
- listPaginationResult.getData()
- .forEach(connectionDTO -> {
- ConnectionResponse connectionResponse = new
ConnectionResponse();
- BeanUtils.copyProperties(connectionDTO,
connectionResponse);
- connectionResponses.add(connectionResponse);
- });
- return new ListConnectionResponse(connectionResponses,
listPaginationResult.getNextToken(), listPaginationResult.getTotal(),
listConnectionRequest.getMaxResults()).success();
+ public Mono<ListConnectionResponse> listConnections(@RequestBody
ListConnectionRequest listConnectionRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ final Set<ConstraintViolation<ListConnectionRequest>> validate
= validator.validate(
+ listConnectionRequest);
+ List<String> errMessage = validate.stream()
+ .map(ConstraintViolation::getMessage)
+ .collect(Collectors.toList());
+ if (!CollectionUtils.isEmpty(errMessage)) {
+ return new ListConnectionResponse(null, null, null,
0).parameterCheckFailRes(errMessage.toString());
+ }
+ final PaginationResult<List<ConnectionDTO>>
listPaginationResult = connectionService.listConnections(
+ accountAPI.getResourceOwnerAccountId(ctx),
listConnectionRequest.getConnectionNamePrefix(),
+ listConnectionRequest.getNextToken(),
listConnectionRequest.getMaxResults());
+ List<ConnectionResponse> connectionResponses =
Lists.newArrayList();
+ listPaginationResult.getData()
+ .forEach(connectionDTO -> {
+ ConnectionResponse connectionResponse = new
ConnectionResponse();
+ BeanUtils.copyProperties(connectionDTO,
connectionResponse);
+ connectionResponses.add(connectionResponse);
+ });
+ return new ListConnectionResponse(connectionResponses,
listPaginationResult.getNextToken(),
+ listPaginationResult.getTotal(),
listConnectionRequest.getMaxResults()).success();
+ });
}
@PostMapping("listEnumsResponse")
- public ListEnumsResponse listEnumsResponse() {
- ListEnumsResponse listEnumsResponse = new ListEnumsResponse();
-
listEnumsResponse.setAuthorizationTypeEnums(Arrays.stream(AuthorizationTypeEnum.values()).collect(Collectors.toList()));
-
listEnumsResponse.setNetworkTypeEnums(Arrays.stream(NetworkTypeEnum.values()).collect(Collectors.toList()));
- return listEnumsResponse.success();
+ public Mono<ListEnumsResponse> listEnumsResponse() {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ ListEnumsResponse listEnumsResponse = new ListEnumsResponse();
+
listEnumsResponse.setAuthorizationTypeEnums(Arrays.stream(AuthorizationTypeEnum.values())
+ .collect(Collectors.toList()));
+
listEnumsResponse.setNetworkTypeEnums(Arrays.stream(NetworkTypeEnum.values())
+ .collect(Collectors.toList()));
+ return listEnumsResponse.success();
+ });
}
- private ConnectionDTO getEventConnectionWithBLOBs(BaseRequest baseRequest)
{
+ private ConnectionDTO getEventConnectionWithBLOBs(Context ctx, BaseRequest
baseRequest) {
ConnectionDTO connectionDTO = new ConnectionDTO();
BeanUtils.copyProperties(baseRequest, connectionDTO);
- connectionDTO.setAccountId(accountAPI.getResourceOwnerAccountId());
+ connectionDTO.setAccountId(accountAPI.getResourceOwnerAccountId(ctx));
return connectionDTO;
}
diff --git
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java
index 5424b34..1af1404 100644
---
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java
+++
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java
@@ -39,6 +39,7 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/bus/")
@@ -54,42 +55,54 @@ public class EventBusController {
AccountAPI accountAPI;
@PostMapping(value = {"createEventBus"})
- public CreateEventBusResponse createEventBus(@RequestBody
CreateEventBusRequest createEventBusRequest) {
- eventBusService.createEventBus(accountAPI.getResourceOwnerAccountId(),
createEventBusRequest.getEventBusName(),
- createEventBusRequest.getDescription());
- return new
CreateEventBusResponse(createEventBusRequest.getEventBusName());
+ public Mono<CreateEventBusResponse> createEventBus(@RequestBody
CreateEventBusRequest createEventBusRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+
eventBusService.createEventBus(accountAPI.getResourceOwnerAccountId(ctx),
+ createEventBusRequest.getEventBusName(),
createEventBusRequest.getDescription());
+ return new
CreateEventBusResponse(createEventBusRequest.getEventBusName());
+ });
}
@PostMapping(value = {"getEventBus"})
- public GetEventBusResponse getEventBus(@RequestBody GetEventBusRequest
getEventBusRequest) {
- EventBus eventbus =
eventBusService.getEventBus(accountAPI.getResourceOwnerAccountId(),
- getEventBusRequest.getEventBusName());
- return new GetEventBusResponse(eventbus.getName(),
eventbus.getDescription(), eventbus.getGmtCreate()
- .getTime());
+ public Mono<GetEventBusResponse> getEventBus(@RequestBody
GetEventBusRequest getEventBusRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ EventBus eventbus =
eventBusService.getEventBus(accountAPI.getResourceOwnerAccountId(ctx),
+ getEventBusRequest.getEventBusName());
+ return new GetEventBusResponse(eventbus.getName(),
eventbus.getDescription(), eventbus.getGmtCreate()
+ .getTime());
+ });
}
@PostMapping(value = {"deleteEventBus"})
- public DeleteEventBusResponse deleteEventBus(@RequestBody
DeleteEventBusRequest deleteEventBusRequest) {
-
eventBusDomainService.deleteEventBusCheckDependencies(accountAPI.getResourceOwnerAccountId(),
- deleteEventBusRequest.getEventBusName());
- return new DeleteEventBusResponse();
+ public Mono<DeleteEventBusResponse> deleteEventBus(@RequestBody
DeleteEventBusRequest deleteEventBusRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+
eventBusDomainService.deleteEventBusCheckDependencies(accountAPI.getResourceOwnerAccountId(ctx),
+ deleteEventBusRequest.getEventBusName());
+ return new DeleteEventBusResponse();
+ });
}
@PostMapping(value = {"listEventBuses"})
- public ListEventBusesResponse listEventBuses(@RequestBody
ListEventBusesRequest listEventBusesRequest) {
- PaginationResult<List<EventBus>> paginationResult =
eventBusService.listEventBuses(
- accountAPI.getResourceOwnerAccountId(),
listEventBusesRequest.getNextToken(),
- listEventBusesRequest.getMaxResults());
- List<EventBusDTO> eventBuses = Lists.newArrayList();
- paginationResult.getData()
- .forEach(eventBus -> {
- EventBusDTO eventBusDTO = new EventBusDTO();
- eventBusDTO.setEventBusName(eventBusDTO.getEventBusName());
- eventBusDTO.setDescription(eventBus.getDescription());
- eventBuses.add(eventBusDTO);
+ public Mono<ListEventBusesResponse> listEventBuses(@RequestBody
ListEventBusesRequest listEventBusesRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ PaginationResult<List<EventBus>> paginationResult =
eventBusService.listEventBuses(
+ accountAPI.getResourceOwnerAccountId(ctx),
listEventBusesRequest.getNextToken(),
+ listEventBusesRequest.getMaxResults());
+ List<EventBusDTO> eventBuses = Lists.newArrayList();
+ paginationResult.getData()
+ .forEach(eventBus -> {
+ EventBusDTO eventBusDTO = new EventBusDTO();
+
eventBusDTO.setEventBusName(eventBusDTO.getEventBusName());
+ eventBusDTO.setDescription(eventBus.getDescription());
+ eventBuses.add(eventBusDTO);
+ });
+ return new ListEventBusesResponse(eventBuses,
paginationResult.getNextToken(),
+ paginationResult.getTotal(),
listEventBusesRequest.getMaxResults());
});
- return new ListEventBusesResponse(eventBuses,
paginationResult.getNextToken(), paginationResult.getTotal(),
- listEventBusesRequest.getMaxResults());
}
}
diff --git
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventDataController.java
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventDataController.java
index 1c7b7b6..68b1f64 100644
---
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventDataController.java
+++
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventDataController.java
@@ -58,21 +58,25 @@ public class EventDataController {
@PostMapping(value = {"putEvents"})
public Mono<PutEventsResponse> putEvents(@RequestHeader Map<String,
String> headers, @RequestBody byte[] body) {
- List<CloudEvent> cloudEvents =
eventConverterAdapter.toEventsRequest(headers, body);
- List<EventBridgeEvent> eventList =
this.converterEventBridgeEvent(cloudEvents);
- return
eventDataHandler.putEvents(accountAPI.getResourceOwnerAccountId(), eventList);
+ return Mono.subscriberContext()
+ .flatMap(ctx -> {
+ List<CloudEvent> cloudEvents =
eventConverterAdapter.toEventsRequest(headers, body);
+ List<EventBridgeEvent> eventList =
this.converterEventBridgeEvent(cloudEvents);
+ return
eventDataHandler.putEvents(accountAPI.getResourceOwnerAccountId(ctx),
eventList);
+ });
}
@RequestMapping(value = {"webhook/putEvents"})
public Mono<PutEventsResponse> putHttpEvents(ServerWebExchange
serverWebExchange,
- @RequestHeader Map<String,
String> headers,
- @RequestBody byte[] body,
- @RequestParam("token") String
token) {
- ServerHttpRequest request = serverWebExchange.getRequest();
- List<CloudEvent> cloudEvents =
httpEventConverter.toEventBridgeEvent(request, body,
- headers, accountAPI.getResourceOwnerAccountId(), token);
- List<EventBridgeEvent> eventList =
this.converterEventBridgeEvent(cloudEvents);
- return
eventDataHandler.putEvents(accountAPI.getResourceOwnerAccountId(), eventList);
+ @RequestHeader Map<String, String> headers, @RequestBody byte[] body,
@RequestParam("token") String token) {
+ return Mono.subscriberContext()
+ .flatMap(ctx -> {
+ ServerHttpRequest request = serverWebExchange.getRequest();
+ List<CloudEvent> cloudEvents =
httpEventConverter.toEventBridgeEvent(request, body, headers,
+ accountAPI.getResourceOwnerAccountId(ctx), token);
+ List<EventBridgeEvent> eventList =
this.converterEventBridgeEvent(cloudEvents);
+ return
eventDataHandler.putEvents(accountAPI.getResourceOwnerAccountId(ctx),
eventList);
+ });
}
private List<EventBridgeEvent> converterEventBridgeEvent(List<CloudEvent>
cloudEvents) {
@@ -91,7 +95,8 @@ public class EventDataController {
.dataschema(cloudEvent.getDataSchema())
.datacontenttype(cloudEvent.getDataContentType())
.time(cloudEvent.getTime())
- .data(cloudEvent.getData().toBytes())
+ .data(cloudEvent.getData()
+ .toBytes())
.build();
if (cloudEvent.getExtensionNames() != null) {
cloudEvent.getExtensionNames()
diff --git
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventRuleController.java
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventRuleController.java
index 2244f0f..c09b5f7 100644
---
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventRuleController.java
+++
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventRuleController.java
@@ -48,6 +48,7 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/rule/")
@@ -63,83 +64,105 @@ public class EventRuleController {
AccountAPI accountAPI;
@PostMapping(value = {"createEventRule"})
- public CreateRuleResponse createRule(@RequestBody CreateRuleRequest
createRuleRequest) {
-
eventRuleService.createEventRule(accountAPI.getResourceOwnerAccountId(),
createRuleRequest.getEventBusName(),
- createRuleRequest.getEventRuleName(),
createRuleRequest.getDescription(),
- createRuleRequest.getFilterPattern());
- return new CreateRuleResponse(createRuleRequest.getEventRuleName());
+ public Mono<CreateRuleResponse> createRule(@RequestBody CreateRuleRequest
createRuleRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+
eventRuleService.createEventRule(accountAPI.getResourceOwnerAccountId(ctx),
+ createRuleRequest.getEventBusName(),
createRuleRequest.getEventRuleName(),
+ createRuleRequest.getDescription(),
createRuleRequest.getFilterPattern());
+ return new
CreateRuleResponse(createRuleRequest.getEventRuleName());
+ });
}
@PostMapping(value = {"getEventRule"})
- public GetRuleResponse getRule(@RequestBody GetRuleRequest getRuleRequest)
{
- EventRuleDetail eventRuleDetail =
eventRuleDomainService.getEventRuleDetail(
- accountAPI.getResourceOwnerAccountId(),
getRuleRequest.getEventBusName(),
- getRuleRequest.getEventRuleName());
-
- List<EventTargetDTO> eventTargets =
EventTargetDTOConverter.convert(eventRuleDetail.getEventTargets());
- GetRuleResponse getRuleResponse = GetRuleResponse.builder()
- .eventBusName(eventRuleDetail.getEventBusName())
- .eventRuleName(eventRuleDetail.getName())
- .description(eventRuleDetail.getDescription())
- .filterPattern(eventRuleDetail.getFilterPattern())
- .status(eventRuleDetail.getStatus())
- .gmtCreate(eventRuleDetail.getGmtCreate())
- .gmtModify(eventRuleDetail.getGmtModify())
- .eventTargets(eventTargets)
- .build();
- return getRuleResponse;
+ public Mono<GetRuleResponse> getRule(@RequestBody GetRuleRequest
getRuleRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ EventRuleDetail eventRuleDetail =
eventRuleDomainService.getEventRuleDetail(
+ accountAPI.getResourceOwnerAccountId(ctx),
getRuleRequest.getEventBusName(),
+ getRuleRequest.getEventRuleName());
+
+ List<EventTargetDTO> eventTargets =
EventTargetDTOConverter.convert(eventRuleDetail.getEventTargets());
+ GetRuleResponse getRuleResponse = GetRuleResponse.builder()
+ .eventBusName(eventRuleDetail.getEventBusName())
+ .eventRuleName(eventRuleDetail.getName())
+ .description(eventRuleDetail.getDescription())
+ .filterPattern(eventRuleDetail.getFilterPattern())
+ .status(eventRuleDetail.getStatus())
+ .gmtCreate(eventRuleDetail.getGmtCreate())
+ .gmtModify(eventRuleDetail.getGmtModify())
+ .eventTargets(eventTargets)
+ .build();
+ return getRuleResponse;
+ });
}
@PostMapping(value = {"deleteEventRule"})
- public DeleteRuleResponse deleteRule(@RequestBody DeleteRuleRequest
deleteRuleRequest) {
-
eventRuleDomainService.deleteEventRuleWithDependencies(accountAPI.getResourceOwnerAccountId(),
- deleteRuleRequest.getEventBusName(),
deleteRuleRequest.getEventRuleName());
- return new DeleteRuleResponse();
+ public Mono<DeleteRuleResponse> deleteRule(@RequestBody DeleteRuleRequest
deleteRuleRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+
eventRuleDomainService.deleteEventRuleWithDependencies(accountAPI.getResourceOwnerAccountId(ctx),
+ deleteRuleRequest.getEventBusName(),
deleteRuleRequest.getEventRuleName());
+ return new DeleteRuleResponse();
+ });
}
@PostMapping(value = {"updateEventRule"})
- public UpdateRuleResponse updateRule(@RequestBody UpdateRuleRequest
updateRuleRequest) {
-
eventRuleDomainService.updateEventRuleWithDependencies(accountAPI.getResourceOwnerAccountId(),
- updateRuleRequest.getEventBusName(),
updateRuleRequest.getEventRuleName(),
- updateRuleRequest.getDescription(),
updateRuleRequest.getFilterPattern());
- return new UpdateRuleResponse();
+ public Mono<UpdateRuleResponse> updateRule(@RequestBody UpdateRuleRequest
updateRuleRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+
eventRuleDomainService.updateEventRuleWithDependencies(accountAPI.getResourceOwnerAccountId(ctx),
+
+ updateRuleRequest.getEventBusName(),
updateRuleRequest.getEventRuleName(),
+ updateRuleRequest.getDescription(),
updateRuleRequest.getFilterPattern());
+ return new UpdateRuleResponse();
+ });
}
@PostMapping(value = {"listEventRules"})
- public ListRulesResponse listRules(@RequestBody ListRulesRequest
listRulesRequest) {
- PaginationResult<List<EventRule>> paginationResult =
eventRuleService.listEventRules(
- accountAPI.getResourceOwnerAccountId(),
listRulesRequest.getEventBusName(), listRulesRequest.getNextToken(),
- listRulesRequest.getMaxResults());
- List<EventRuleDTO> eventRules = Lists.newArrayList();
- paginationResult.getData()
- .forEach(eventRule -> {
- EventRuleDTO eventRuleDTO = EventRuleDTO.builder()
- .eventBusName(eventRule.getEventBusName())
- .eventRuleName(eventRule.getName())
- .description(eventRule.getDescription())
- .filterPattern(eventRule.getFilterPattern())
- .status(eventRule.getStatus())
- .gmtCreate(eventRule.getGmtCreate())
- .gmtModify(eventRule.getGmtModify())
- .build();
- eventRules.add(eventRuleDTO);
+ public Mono<ListRulesResponse> listRules(@RequestBody ListRulesRequest
listRulesRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ PaginationResult<List<EventRule>> paginationResult =
eventRuleService.listEventRules(
+ accountAPI.getResourceOwnerAccountId(ctx),
listRulesRequest.getEventBusName(),
+ listRulesRequest.getNextToken(),
listRulesRequest.getMaxResults());
+ List<EventRuleDTO> eventRules = Lists.newArrayList();
+ paginationResult.getData()
+ .forEach(eventRule -> {
+ EventRuleDTO eventRuleDTO = EventRuleDTO.builder()
+ .eventBusName(eventRule.getEventBusName())
+ .eventRuleName(eventRule.getName())
+ .description(eventRule.getDescription())
+ .filterPattern(eventRule.getFilterPattern())
+ .status(eventRule.getStatus())
+ .gmtCreate(eventRule.getGmtCreate())
+ .gmtModify(eventRule.getGmtModify())
+ .build();
+ eventRules.add(eventRuleDTO);
+ });
+ return new ListRulesResponse(eventRules,
paginationResult.getNextToken(), paginationResult.getTotal(),
+ listRulesRequest.getMaxResults());
});
- return new ListRulesResponse(eventRules,
paginationResult.getNextToken(), paginationResult.getTotal(),
- listRulesRequest.getMaxResults());
}
@PostMapping(value = {"enableEventRule"})
- public EnableRuleResponse enableRule(@RequestBody EnableRuleRequest
enableRuleRequest) {
-
eventRuleDomainService.enableEventRuleWithDependencies(accountAPI.getResourceOwnerAccountId(),
- enableRuleRequest.getEventBusName(),
enableRuleRequest.getEventRuleName());
- return new EnableRuleResponse();
+ public Mono<EnableRuleResponse> enableRule(@RequestBody EnableRuleRequest
enableRuleRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+
eventRuleDomainService.enableEventRuleWithDependencies(accountAPI.getResourceOwnerAccountId(ctx),
+ enableRuleRequest.getEventBusName(),
enableRuleRequest.getEventRuleName());
+ return new EnableRuleResponse();
+ });
}
@PostMapping(value = {"disableEventRule"})
- public DisableRuleResponse disableRule(@RequestBody DisableRuleRequest
disableRuleRequest) {
-
eventRuleDomainService.disableEventRuleWithDependencies(accountAPI.getResourceOwnerAccountId(),
- disableRuleRequest.getEventBusName(),
disableRuleRequest.getEventRuleName());
- return new DisableRuleResponse();
+ public Mono<DisableRuleResponse> disableRule(@RequestBody
DisableRuleRequest disableRuleRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+
eventRuleDomainService.disableEventRuleWithDependencies(accountAPI.getResourceOwnerAccountId(ctx),
+ disableRuleRequest.getEventBusName(),
disableRuleRequest.getEventRuleName());
+ return new DisableRuleResponse();
+ });
}
}
diff --git
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventSourceController.java
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventSourceController.java
index 7ad1e0a..d073be9 100644
---
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventSourceController.java
+++
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventSourceController.java
@@ -42,6 +42,7 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/source/")
@@ -54,69 +55,88 @@ public class EventSourceController {
EventSourceServiceFactory eventSourceServiceFactory;
@PostMapping(value = {"createEventSource"})
- public CreateEventSourceResponse createEventSource(@RequestBody
CreateEventSourceRequest createEventSourceRequest) {
- EventSourceService eventSourceService =
eventSourceServiceFactory.getEventSourceService(
- EventSourceTypeEnum.USER_DEFINED.name(),
createEventSourceRequest.getClassName());
-
eventSourceService.createEventSource(accountAPI.getResourceOwnerAccountId(),
- createEventSourceRequest.getEventBusName(),
createEventSourceRequest.getEventSourceName(),
- createEventSourceRequest.getDescription(),
createEventSourceRequest.getClassName(),
- createEventSourceRequest.getConfig());
- return new
CreateEventSourceResponse(createEventSourceRequest.getEventSourceName());
+ public Mono<CreateEventSourceResponse> createEventSource(
+ @RequestBody CreateEventSourceRequest createEventSourceRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ EventSourceService eventSourceService =
eventSourceServiceFactory.getEventSourceService(
+ EventSourceTypeEnum.USER_DEFINED.name(),
createEventSourceRequest.getClassName());
+
eventSourceService.createEventSource(accountAPI.getResourceOwnerAccountId(ctx),
+ createEventSourceRequest.getEventBusName(),
createEventSourceRequest.getEventSourceName(),
+ createEventSourceRequest.getDescription(),
createEventSourceRequest.getClassName(),
+ createEventSourceRequest.getConfig());
+ return new
CreateEventSourceResponse(createEventSourceRequest.getEventSourceName());
+ });
}
@PostMapping(value = {"updateEventSource"})
- public UpdateEventSourceResponse updateEventSource(@RequestBody
UpdateEventSourceRequest updateEventSourceRequest) {
- EventSourceService eventSourceService =
eventSourceServiceFactory.getEventSourceService(
- accountAPI.getResourceOwnerAccountId(),
updateEventSourceRequest.getEventBusName(),
- updateEventSourceRequest.getEventSourceName());
-
eventSourceService.updateEventSource(accountAPI.getResourceOwnerAccountId(),
- updateEventSourceRequest.getEventBusName(),
updateEventSourceRequest.getEventSourceName(),
- updateEventSourceRequest.getDescription(),
updateEventSourceRequest.getClassName(),
- updateEventSourceRequest.getStatus(),
updateEventSourceRequest.getConfig());
- return new UpdateEventSourceResponse();
+ public Mono<UpdateEventSourceResponse> updateEventSource(
+ @RequestBody UpdateEventSourceRequest updateEventSourceRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ EventSourceService eventSourceService =
eventSourceServiceFactory.getEventSourceService(
+ accountAPI.getResourceOwnerAccountId(ctx),
updateEventSourceRequest.getEventBusName(),
+ updateEventSourceRequest.getEventSourceName());
+
eventSourceService.updateEventSource(accountAPI.getResourceOwnerAccountId(ctx),
+ updateEventSourceRequest.getEventBusName(),
updateEventSourceRequest.getEventSourceName(),
+ updateEventSourceRequest.getDescription(),
updateEventSourceRequest.getClassName(),
+ updateEventSourceRequest.getStatus(),
updateEventSourceRequest.getConfig());
+ return new UpdateEventSourceResponse();
+ });
}
@PostMapping(value = {"deleteEventSource"})
- public DeleteEventSourceResponse deleteEventSource(@RequestBody
DeleteEventSourceRequest deleteEventSourceRequest) {
- EventSourceService eventSourceService =
eventSourceServiceFactory.getEventSourceService(
- accountAPI.getResourceOwnerAccountId(),
deleteEventSourceRequest.getEventBusName(),
- deleteEventSourceRequest.getEventSourceName());
-
eventSourceService.deleteEventSource(accountAPI.getResourceOwnerAccountId(),
- deleteEventSourceRequest.getEventBusName(),
deleteEventSourceRequest.getEventSourceName());
- return new DeleteEventSourceResponse();
+ public Mono<DeleteEventSourceResponse> deleteEventSource(
+ @RequestBody DeleteEventSourceRequest deleteEventSourceRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ EventSourceService eventSourceService =
eventSourceServiceFactory.getEventSourceService(
+ accountAPI.getResourceOwnerAccountId(ctx),
deleteEventSourceRequest.getEventBusName(),
+ deleteEventSourceRequest.getEventSourceName());
+
eventSourceService.deleteEventSource(accountAPI.getResourceOwnerAccountId(ctx),
+ deleteEventSourceRequest.getEventBusName(),
deleteEventSourceRequest.getEventSourceName());
+ return new DeleteEventSourceResponse();
+ });
}
@PostMapping(value = {"getEventSource"})
- public GetEventSourceResponse getEventSource(@RequestBody
GetEventSourceRequest getEventSourceRequest) {
- EventSourceService eventSourceService =
eventSourceServiceFactory.getEventSourceService(
- accountAPI.getResourceOwnerAccountId(),
getEventSourceRequest.getEventBusName(),
- getEventSourceRequest.getEventSourceName());
- EventSource eventSource =
eventSourceService.getEventSource(accountAPI.getResourceOwnerAccountId(),
- getEventSourceRequest.getEventBusName(),
getEventSourceRequest.getEventSourceName());
- return new GetEventSourceResponse(eventSource.getEventBusName(),
eventSource.getName(),
- eventSource.getDescription(), eventSource.getClassName(),
eventSource.getConfig());
+ public Mono<GetEventSourceResponse> getEventSource(@RequestBody
GetEventSourceRequest getEventSourceRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ EventSourceService eventSourceService =
eventSourceServiceFactory.getEventSourceService(
+ accountAPI.getResourceOwnerAccountId(ctx),
getEventSourceRequest.getEventBusName(),
+ getEventSourceRequest.getEventSourceName());
+ EventSource eventSource =
eventSourceService.getEventSource(accountAPI.getResourceOwnerAccountId(ctx),
+ getEventSourceRequest.getEventBusName(),
getEventSourceRequest.getEventSourceName());
+ return new
GetEventSourceResponse(eventSource.getEventBusName(), eventSource.getName(),
+ eventSource.getDescription(), eventSource.getClassName(),
eventSource.getConfig());
+ });
}
@PostMapping(value = {"listEventSources"})
- public ListEventSourcesResponse listEventSources(@RequestBody
ListEventSourcesRequest listEventSourcesRequest) {
- EventSourceService eventSourceService =
eventSourceServiceFactory.getDefaultEventSourceService();
- PaginationResult<List<EventSource>> paginationResult =
eventSourceService.listEventSources(
- accountAPI.getResourceOwnerAccountId(),
listEventSourcesRequest.getEventBusName(),
- listEventSourcesRequest.getNextToken(),
listEventSourcesRequest.getMaxResults());
- List<EventSourceDTO> eventSourceDTOS = Lists.newArrayList();
- paginationResult.getData()
- .forEach(eventSource -> {
- EventSourceDTO eventSourceDTO = EventSourceDTO.builder()
- .eventBusName(eventSource.getEventBusName())
- .eventSourceName(eventSource.getName())
- .description(eventSource.getDescription())
- .gmtCreate(eventSource.getGmtCreate())
- .gmtModify(eventSource.getGmtModify())
- .build();
- eventSourceDTOS.add(eventSourceDTO);
+ public Mono<ListEventSourcesResponse> listEventSources(
+ @RequestBody ListEventSourcesRequest listEventSourcesRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ EventSourceService eventSourceService =
eventSourceServiceFactory.getDefaultEventSourceService();
+ PaginationResult<List<EventSource>> paginationResult =
eventSourceService.listEventSources(
+ accountAPI.getResourceOwnerAccountId(ctx),
listEventSourcesRequest.getEventBusName(),
+ listEventSourcesRequest.getNextToken(),
listEventSourcesRequest.getMaxResults());
+ List<EventSourceDTO> eventSourceDTOS = Lists.newArrayList();
+ paginationResult.getData()
+ .forEach(eventSource -> {
+ EventSourceDTO eventSourceDTO =
EventSourceDTO.builder()
+ .eventBusName(eventSource.getEventBusName())
+ .eventSourceName(eventSource.getName())
+ .description(eventSource.getDescription())
+ .gmtCreate(eventSource.getGmtCreate())
+ .gmtModify(eventSource.getGmtModify())
+ .build();
+ eventSourceDTOS.add(eventSourceDTO);
+ });
+ return new ListEventSourcesResponse(eventSourceDTOS,
paginationResult.getNextToken(),
+ paginationResult.getTotal(),
listEventSourcesRequest.getMaxResults());
});
- return new ListEventSourcesResponse(eventSourceDTOS,
paginationResult.getNextToken(),
- paginationResult.getTotal(),
listEventSourcesRequest.getMaxResults());
}
}
diff --git
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventTargetController.java
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventTargetController.java
index d8607a8..313d99f 100644
---
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventTargetController.java
+++
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventTargetController.java
@@ -39,6 +39,7 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/target/")
@@ -51,44 +52,57 @@ public class EventTargetController {
AccountAPI accountAPI;
@PostMapping(value = {"createEventTargets"})
- public CreateTargetsResponse createTargets(@RequestBody
CreateTargetsRequest createTargetsRequest) {
- List<EventTarget> eventTargetList =
EventTargetConverter.convertEventTargets(
- accountAPI.getResourceOwnerAccountId(),
createTargetsRequest.getEventBusName(),
- createTargetsRequest.getEventRuleName(),
createTargetsRequest.getEventTargets());
-
eventTargetService.createTargets(accountAPI.getResourceOwnerAccountId(),
createTargetsRequest.getEventBusName(),
- createTargetsRequest.getEventRuleName(), eventTargetList);
- return new CreateTargetsResponse();
+ public Mono<CreateTargetsResponse> createTargets(@RequestBody
CreateTargetsRequest createTargetsRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ List<EventTarget> eventTargetList =
EventTargetConverter.convertEventTargets(
+ accountAPI.getResourceOwnerAccountId(ctx),
createTargetsRequest.getEventBusName(),
+ createTargetsRequest.getEventRuleName(),
createTargetsRequest.getEventTargets());
+
eventTargetService.createTargets(accountAPI.getResourceOwnerAccountId(ctx),
+ createTargetsRequest.getEventBusName(),
createTargetsRequest.getEventRuleName(), eventTargetList);
+ return new CreateTargetsResponse();
+ });
}
@PostMapping(value = {"updateEventTargets"})
- public UpdateTargetsResponse updateTargets(@RequestBody
UpdateTargetsRequest updateTargetsRequest) {
- List<EventTarget> eventTargetList =
EventTargetConverter.convertEventTargets(
- accountAPI.getResourceOwnerAccountId(),
updateTargetsRequest.getEventBusName(),
- updateTargetsRequest.getEventRuleName(),
updateTargetsRequest.getEventTargets());
-
eventTargetService.updateTargets(accountAPI.getResourceOwnerAccountId(),
updateTargetsRequest.getEventBusName(),
- updateTargetsRequest.getEventRuleName(), eventTargetList);
- return new UpdateTargetsResponse();
+ public Mono<UpdateTargetsResponse> updateTargets(@RequestBody
UpdateTargetsRequest updateTargetsRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ List<EventTarget> eventTargetList =
EventTargetConverter.convertEventTargets(
+ accountAPI.getResourceOwnerAccountId(ctx),
updateTargetsRequest.getEventBusName(),
+ updateTargetsRequest.getEventRuleName(),
updateTargetsRequest.getEventTargets());
+
eventTargetService.updateTargets(accountAPI.getResourceOwnerAccountId(ctx),
+ updateTargetsRequest.getEventBusName(),
updateTargetsRequest.getEventRuleName(), eventTargetList);
+ return new UpdateTargetsResponse();
+ });
}
@PostMapping(value = {"deleteEventTargets"})
- public DeleteTargetsResponse deleteTargets(@RequestBody
DeleteTargetsRequest deleteTargetsRequest) {
-
eventTargetService.deleteTargets(accountAPI.getResourceOwnerAccountId(),
deleteTargetsRequest.getEventBusName(),
- deleteTargetsRequest.getEventRuleName(),
deleteTargetsRequest.getEventTargetNames());
- return new DeleteTargetsResponse();
+ public Mono<DeleteTargetsResponse> deleteTargets(@RequestBody
DeleteTargetsRequest deleteTargetsRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+
eventTargetService.deleteTargets(accountAPI.getResourceOwnerAccountId(ctx),
+ deleteTargetsRequest.getEventBusName(),
deleteTargetsRequest.getEventRuleName(),
+ deleteTargetsRequest.getEventTargetNames());
+ return new DeleteTargetsResponse();
+ });
}
@PostMapping(value = {"listEventTargets"})
- public ListTargetsResponse listEventTargets(@RequestBody
ListTargetsRequest listTargetsRequest) {
- List<EventTarget> eventTargetRunnerList =
eventTargetService.listTargets(accountAPI.getResourceOwnerAccountId(),
- listTargetsRequest.getEventBusName(),
listTargetsRequest.getEventRuleName());
- List<EventTargetDTO> eventTargets = eventTargetRunnerList.stream()
- .map(entry -> EventTargetDTOConverter.convert(entry))
- .collect(Collectors.toList());
- ListTargetsResponse listTargetsResponse = new ListTargetsResponse();
-
listTargetsResponse.setEventBusName(listTargetsRequest.getEventBusName());
-
listTargetsResponse.setEventRuleName(listTargetsRequest.getEventRuleName());
- listTargetsResponse.setEventTargets(eventTargets);
- return listTargetsResponse;
+ public Mono<ListTargetsResponse> listEventTargets(@RequestBody
ListTargetsRequest listTargetsRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ List<EventTarget> eventTargetRunnerList =
eventTargetService.listTargets(
+ accountAPI.getResourceOwnerAccountId(ctx),
listTargetsRequest.getEventBusName(),
+ listTargetsRequest.getEventRuleName());
+ List<EventTargetDTO> eventTargets =
eventTargetRunnerList.stream()
+ .map(entry -> EventTargetDTOConverter.convert(entry))
+ .collect(Collectors.toList());
+ ListTargetsResponse listTargetsResponse = new
ListTargetsResponse();
+
listTargetsResponse.setEventBusName(listTargetsRequest.getEventBusName());
+
listTargetsResponse.setEventRuleName(listTargetsRequest.getEventRuleName());
+ listTargetsResponse.setEventTargets(eventTargets);
+ return listTargetsResponse;
+ });
}
-
}
diff --git
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventTypeController.java
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventTypeController.java
index 4ac545e..b44a4e3 100644
---
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventTypeController.java
+++
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventTypeController.java
@@ -32,6 +32,7 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/type/")
@@ -44,27 +45,30 @@ public class EventTypeController {
EventTypeService eventTypeService;
@PostMapping(value = {"listEventTypes"})
- public ListEventTypesResponse listEventTypes(@RequestBody
ListEventTypesRequest listEventTypesRequest) {
- PaginationResult<List<EventType>> paginationResult =
eventTypeService.listEventTypes(
- accountAPI.getResourceOwnerAccountId(),
listEventTypesRequest.getEventBusName(),
- listEventTypesRequest.getEventSourceName(),
listEventTypesRequest.getNextToken(),
- listEventTypesRequest.getMaxResults());
+ public Mono<ListEventTypesResponse> listEventTypes(@RequestBody
ListEventTypesRequest listEventTypesRequest) {
+ return Mono.subscriberContext()
+ .map(ctx -> {
+ PaginationResult<List<EventType>> paginationResult =
eventTypeService.listEventTypes(
+ accountAPI.getResourceOwnerAccountId(ctx),
listEventTypesRequest.getEventBusName(),
+ listEventTypesRequest.getEventSourceName(),
listEventTypesRequest.getNextToken(),
+ listEventTypesRequest.getMaxResults());
- List<EventTypeDTO> eventTypeDTOS = Lists.newArrayList();
- paginationResult.getData()
- .forEach(eventType -> {
- EventTypeDTO eventTypeDTO = EventTypeDTO.builder()
- .eventBusName(eventType.getEventBusName())
- .eventSourceName(eventType.getEventSourceName())
- .eventTypeName(eventType.getName())
- .description(eventType.getDescription())
- .gmtCreate(eventType.getGmtCreate())
- .gmtModify(eventType.getGmtModify())
- .build();
- eventTypeDTOS.add(eventTypeDTO);
+ List<EventTypeDTO> eventTypeDTOS = Lists.newArrayList();
+ paginationResult.getData()
+ .forEach(eventType -> {
+ EventTypeDTO eventTypeDTO = EventTypeDTO.builder()
+ .eventBusName(eventType.getEventBusName())
+ .eventSourceName(eventType.getEventSourceName())
+ .eventTypeName(eventType.getName())
+ .description(eventType.getDescription())
+ .gmtCreate(eventType.getGmtCreate())
+ .gmtModify(eventType.getGmtModify())
+ .build();
+ eventTypeDTOS.add(eventTypeDTO);
+ });
+ return new ListEventTypesResponse(eventTypeDTOS,
paginationResult.getNextToken(),
+ paginationResult.getTotal(),
listEventTypesRequest.getMaxResults());
});
- return new ListEventTypesResponse(eventTypeDTOS,
paginationResult.getNextToken(), paginationResult.getTotal(),
- listEventTypesRequest.getMaxResults());
}
}
diff --git
a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java
b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java
index 4ea9405..e15fccc 100644
---
a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java
+++
b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java
@@ -42,6 +42,7 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
+import reactor.core.publisher.Mono;
import javax.validation.ConstraintViolation;
import javax.validation.Validator;
@@ -67,83 +68,118 @@ public class ApiDestinationDTOControllerTest {
@Before
public void testBefore() {
-
Mockito.when(accountAPI.getResourceOwnerAccountId()).thenReturn(UUID.randomUUID().toString());
+ Mockito.when(accountAPI.getResourceOwnerAccountId(any()))
+ .thenReturn(UUID.randomUUID()
+ .toString());
}
@Test
public void testCreateApiDestination() {
-
Mockito.when(apiDestinationService.createApiDestination(any())).thenReturn(UUID.randomUUID().toString());
+ Mockito.when(apiDestinationService.createApiDestination(any()))
+ .thenReturn(UUID.randomUUID()
+ .toString());
CreateApiDestinationRequest createApiDestinationRequest = new
CreateApiDestinationRequest();
-
createApiDestinationRequest.setApiDestinationName(UUID.randomUUID().toString());
-
createApiDestinationRequest.setDescription(UUID.randomUUID().toString());
+ createApiDestinationRequest.setApiDestinationName(UUID.randomUUID()
+ .toString());
+ createApiDestinationRequest.setDescription(UUID.randomUUID()
+ .toString());
HttpApiParameters httpApiParameters = new HttpApiParameters();
- httpApiParameters.setEndpoint(UUID.randomUUID().toString());
- httpApiParameters.setMethod(UUID.randomUUID().toString());
+ httpApiParameters.setEndpoint(UUID.randomUUID()
+ .toString());
+ httpApiParameters.setMethod(UUID.randomUUID()
+ .toString());
createApiDestinationRequest.setHttpApiParameters(httpApiParameters);
createApiDestinationRequest.setInvocationRateLimitPerSecond(11);
- final CreateApiDestinationResponse apiDestination =
apiDestinationController.createApiDestination(createApiDestinationRequest);
- Assert.assertEquals(apiDestination.getCode(),
EventBridgeErrorCode.Success.getCode());
+ final Mono<CreateApiDestinationResponse> apiDestination =
apiDestinationController.createApiDestination(
+ createApiDestinationRequest);
+ Assert.assertEquals(apiDestination.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
}
@Test
public void testUpdateApiDestination() {
Set<ConstraintViolation<UpdateApiDestinationRequest>>
constraintViolations = new HashSet<>();
-
Mockito.when(validator.validate(any(UpdateApiDestinationRequest.class))).thenReturn(constraintViolations);
-
Mockito.when(apiDestinationService.updateApiDestination(any())).thenReturn(Boolean.TRUE);
+
Mockito.when(validator.validate(any(UpdateApiDestinationRequest.class)))
+ .thenReturn(constraintViolations);
+ Mockito.when(apiDestinationService.updateApiDestination(any()))
+ .thenReturn(Boolean.TRUE);
UpdateApiDestinationRequest updateApiDestinationRequest = new
UpdateApiDestinationRequest();
-
updateApiDestinationRequest.setApiDestinationName(UUID.randomUUID().toString());
-
updateApiDestinationRequest.setDescription(UUID.randomUUID().toString());
+ updateApiDestinationRequest.setApiDestinationName(UUID.randomUUID()
+ .toString());
+ updateApiDestinationRequest.setDescription(UUID.randomUUID()
+ .toString());
HttpApiParameters httpApiParameters = new HttpApiParameters();
- httpApiParameters.setEndpoint(UUID.randomUUID().toString());
- httpApiParameters.setMethod(UUID.randomUUID().toString());
+ httpApiParameters.setEndpoint(UUID.randomUUID()
+ .toString());
+ httpApiParameters.setMethod(UUID.randomUUID()
+ .toString());
updateApiDestinationRequest.setHttpApiParameters(httpApiParameters);
updateApiDestinationRequest.setInvocationRateLimitPerSecond(11);
- final UpdateApiDestinationResponse updateApiDestinationResponse =
apiDestinationController.updateApiDestination(updateApiDestinationRequest);
- Assert.assertEquals(updateApiDestinationResponse.getCode(),
EventBridgeErrorCode.Success.getCode());
+ final Mono<UpdateApiDestinationResponse> updateApiDestinationResponse
+ =
apiDestinationController.updateApiDestination(updateApiDestinationRequest);
+ Assert.assertEquals(updateApiDestinationResponse.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
}
@Test
public void testGetApiDestination() {
Set<ConstraintViolation<GetApiDestinationRequest>>
constraintViolations = new HashSet<>();
-
Mockito.when(validator.validate(any(GetApiDestinationRequest.class))).thenReturn(constraintViolations);
+ Mockito.when(validator.validate(any(GetApiDestinationRequest.class)))
+ .thenReturn(constraintViolations);
ApiDestinationDTO eventApiDestinationDTO = new ApiDestinationDTO();
- eventApiDestinationDTO.setName(UUID.randomUUID().toString());
- Mockito.when(apiDestinationService.getApiDestination(any(),
any())).thenReturn(eventApiDestinationDTO);
+ eventApiDestinationDTO.setName(UUID.randomUUID()
+ .toString());
+ Mockito.when(apiDestinationService.getApiDestination(any(), any()))
+ .thenReturn(eventApiDestinationDTO);
GetApiDestinationRequest getApiDestinationRequest = new
GetApiDestinationRequest();
-
getApiDestinationRequest.setApiDestinationName(UUID.randomUUID().toString());
- final GetApiDestinationResponse apiDestination =
apiDestinationController.getApiDestination(getApiDestinationRequest);
- Assert.assertEquals(apiDestination.getCode(),
EventBridgeErrorCode.Success.getCode());
+ getApiDestinationRequest.setApiDestinationName(UUID.randomUUID()
+ .toString());
+ final Mono<GetApiDestinationResponse> apiDestination =
apiDestinationController.getApiDestination(
+ getApiDestinationRequest);
+ Assert.assertEquals(apiDestination.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
}
@Test
public void testDeleteApiDestination() {
Set<ConstraintViolation<DeleteApiDestinationRequest>>
constraintViolations = new HashSet<>();
-
Mockito.when(validator.validate(any(DeleteApiDestinationRequest.class))).thenReturn(constraintViolations);
- Mockito.when(apiDestinationService.deleteApiDestination(any(),
any())).thenReturn(Boolean.TRUE);
+
Mockito.when(validator.validate(any(DeleteApiDestinationRequest.class)))
+ .thenReturn(constraintViolations);
+ Mockito.when(apiDestinationService.deleteApiDestination(any(), any()))
+ .thenReturn(Boolean.TRUE);
DeleteApiDestinationRequest deleteApiDestinationRequest = new
DeleteApiDestinationRequest();
-
deleteApiDestinationRequest.setApiDestinationName(UUID.randomUUID().toString());
- final DeleteApiDestinationResponse deleteApiDestinationResponse =
apiDestinationController.deleteApiDestination(deleteApiDestinationRequest);
- Assert.assertEquals(deleteApiDestinationResponse.getCode(),
EventBridgeErrorCode.Success.getCode());
+ deleteApiDestinationRequest.setApiDestinationName(UUID.randomUUID()
+ .toString());
+ final Mono<DeleteApiDestinationResponse> deleteApiDestinationResponse
+ =
apiDestinationController.deleteApiDestination(deleteApiDestinationRequest);
+ Assert.assertEquals(deleteApiDestinationResponse.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
}
@Test
public void testListApiDestinations() {
Set<ConstraintViolation<ListApiDestinationsRequest>>
constraintViolations = new HashSet<>();
-
Mockito.when(validator.validate(any(ListApiDestinationsRequest.class))).thenReturn(constraintViolations);
+ Mockito.when(validator.validate(any(ListApiDestinationsRequest.class)))
+ .thenReturn(constraintViolations);
PaginationResult<List<ApiDestinationDTO>> result = new
PaginationResult();
List<ApiDestinationDTO> apiDestinationDTOList = Lists.newArrayList();
ApiDestinationDTO apiDestinationDTO = new ApiDestinationDTO();
- apiDestinationDTO.setName(UUID.randomUUID().toString());
+ apiDestinationDTO.setName(UUID.randomUUID()
+ .toString());
apiDestinationDTOList.add(apiDestinationDTO);
result.setData(apiDestinationDTOList);
result.setTotal(9);
result.setNextToken("0");
- Mockito.when(apiDestinationService.listApiDestinations(any(), any(),
any(), anyInt())).thenReturn(result);
+ Mockito.when(apiDestinationService.listApiDestinations(any(), any(),
any(), anyInt()))
+ .thenReturn(result);
ListApiDestinationsRequest listApiDestinationsRequest = new
ListApiDestinationsRequest();
-
listApiDestinationsRequest.setApiDestinationNamePrefix(UUID.randomUUID().toString());
+
listApiDestinationsRequest.setApiDestinationNamePrefix(UUID.randomUUID()
+ .toString());
listApiDestinationsRequest.setNextToken("0");
listApiDestinationsRequest.setMaxResults(10);
- final ListApiDestinationsResponse listApiDestinationsResponse =
apiDestinationController.listApiDestinations(listApiDestinationsRequest);
- Assert.assertEquals(listApiDestinationsResponse.getCode(),
EventBridgeErrorCode.Success.getCode());
+ final Mono<ListApiDestinationsResponse> listApiDestinationsResponse
+ =
apiDestinationController.listApiDestinations(listApiDestinationsRequest);
+ Assert.assertEquals(listApiDestinationsResponse.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
}
}
diff --git
a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java
b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java
index b21e8ef..e765868 100644
---
a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java
+++
b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java
@@ -48,6 +48,7 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
+import reactor.core.publisher.Mono;
import javax.validation.ConstraintViolation;
import javax.validation.Validator;
@@ -74,93 +75,135 @@ public class ConnectionControllerTest {
@Before
public void testBefore() throws Exception {
-
Mockito.when(accountAPI.getResourceOwnerAccountId()).thenReturn(UUID.randomUUID().toString());
+ Mockito.when(accountAPI.getResourceOwnerAccountId(any()))
+ .thenReturn(UUID.randomUUID()
+ .toString());
}
@Test
public void testCreateConnection() {
-
Mockito.when(connectionService.createConnection(any(ConnectionDTO.class))).thenReturn(UUID.randomUUID().toString());
+
Mockito.when(connectionService.createConnection(any(ConnectionDTO.class)))
+ .thenReturn(UUID.randomUUID()
+ .toString());
Set<ConstraintViolation<CreateConnectionRequest>> constraintViolations
= new HashSet<>();
-
Mockito.when(validator.validate(any(CreateConnectionRequest.class))).thenReturn(constraintViolations);
+ Mockito.when(validator.validate(any(CreateConnectionRequest.class)))
+ .thenReturn(constraintViolations);
CreateConnectionRequest createConnectionRequest = new
CreateConnectionRequest();
-
createConnectionRequest.setConnectionName(UUID.randomUUID().toString());
- createConnectionRequest.setDescription(UUID.randomUUID().toString());
+ createConnectionRequest.setConnectionName(UUID.randomUUID()
+ .toString());
+ createConnectionRequest.setDescription(UUID.randomUUID()
+ .toString());
NetworkParameters networkParameters = new NetworkParameters();
networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType());
- networkParameters.setSecurityGroupId(UUID.randomUUID().toString());
- networkParameters.setVpcId(UUID.randomUUID().toString());
- networkParameters.setVswitcheId(UUID.randomUUID().toString());
+ networkParameters.setSecurityGroupId(UUID.randomUUID()
+ .toString());
+ networkParameters.setVpcId(UUID.randomUUID()
+ .toString());
+ networkParameters.setVswitcheId(UUID.randomUUID()
+ .toString());
createConnectionRequest.setNetworkParameters(networkParameters);
AuthParameters authParameters = new AuthParameters();
BasicAuthParameters basicAuthParameters = new BasicAuthParameters();
- basicAuthParameters.setPassword(UUID.randomUUID().toString());
- basicAuthParameters.setUsername(UUID.randomUUID().toString());
+ basicAuthParameters.setPassword(UUID.randomUUID()
+ .toString());
+ basicAuthParameters.setUsername(UUID.randomUUID()
+ .toString());
authParameters.setBasicAuthParameters(basicAuthParameters);
authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType());
createConnectionRequest.setAuthParameters(authParameters);
- final CreateConnectionResponse connection =
connectionController.createConnection(createConnectionRequest);
- Assert.assertEquals(connection.getCode(),
EventBridgeErrorCode.Success.getCode());
+ final Mono<CreateConnectionResponse> connection =
connectionController.createConnection(
+ createConnectionRequest);
+ Assert.assertEquals(connection.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
}
@Test
public void testDeleteConnection() {
-
Mockito.doNothing().when(connectionService).deleteConnection(anyString(),
anyString());
+ Mockito.doNothing()
+ .when(connectionService)
+ .deleteConnection(anyString(), anyString());
Set<ConstraintViolation<DeleteConnectionRequest>> constraintViolations
= new HashSet<>();
-
Mockito.when(validator.validate(any(DeleteConnectionRequest.class))).thenReturn(constraintViolations);
+ Mockito.when(validator.validate(any(DeleteConnectionRequest.class)))
+ .thenReturn(constraintViolations);
DeleteConnectionRequest deleteConnectionRequest = new
DeleteConnectionRequest();
-
deleteConnectionRequest.setConnectionName(UUID.randomUUID().toString());
- final DeleteConnectionResponse deleteConnectionResponse =
connectionController.deleteConnection(deleteConnectionRequest);
- Assert.assertEquals(deleteConnectionResponse.getCode(),
EventBridgeErrorCode.Success.getCode());
+ deleteConnectionRequest.setConnectionName(UUID.randomUUID()
+ .toString());
+ final Mono<DeleteConnectionResponse> deleteConnectionResponse =
connectionController.deleteConnection(
+ deleteConnectionRequest);
+ Assert.assertEquals(deleteConnectionResponse.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
}
@Test
public void testUpdateConnection() {
-
Mockito.doNothing().when(connectionService).updateConnection(any(ConnectionDTO.class),
anyString());
+ Mockito.doNothing()
+ .when(connectionService)
+ .updateConnection(any(ConnectionDTO.class), anyString());
Set<ConstraintViolation<UpdateConnectionRequest>> constraintViolations
= new HashSet<>();
-
Mockito.when(validator.validate(any(UpdateConnectionRequest.class))).thenReturn(constraintViolations);
+ Mockito.when(validator.validate(any(UpdateConnectionRequest.class)))
+ .thenReturn(constraintViolations);
UpdateConnectionRequest updateConnectionRequest = new
UpdateConnectionRequest();
-
updateConnectionRequest.setConnectionName(UUID.randomUUID().toString());
- updateConnectionRequest.setDescription(UUID.randomUUID().toString());
+ updateConnectionRequest.setConnectionName(UUID.randomUUID()
+ .toString());
+ updateConnectionRequest.setDescription(UUID.randomUUID()
+ .toString());
NetworkParameters networkParameters = new NetworkParameters();
networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType());
- networkParameters.setSecurityGroupId(UUID.randomUUID().toString());
- networkParameters.setVpcId(UUID.randomUUID().toString());
- networkParameters.setVswitcheId(UUID.randomUUID().toString());
+ networkParameters.setSecurityGroupId(UUID.randomUUID()
+ .toString());
+ networkParameters.setVpcId(UUID.randomUUID()
+ .toString());
+ networkParameters.setVswitcheId(UUID.randomUUID()
+ .toString());
updateConnectionRequest.setNetworkParameters(networkParameters);
AuthParameters authParameters = new AuthParameters();
BasicAuthParameters basicAuthParameters = new BasicAuthParameters();
- basicAuthParameters.setPassword(UUID.randomUUID().toString());
- basicAuthParameters.setUsername(UUID.randomUUID().toString());
+ basicAuthParameters.setPassword(UUID.randomUUID()
+ .toString());
+ basicAuthParameters.setUsername(UUID.randomUUID()
+ .toString());
authParameters.setBasicAuthParameters(basicAuthParameters);
authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType());
updateConnectionRequest.setAuthParameters(authParameters);
- final UpdateConnectionResponse updateConnectionResponse =
connectionController.updateConnection(updateConnectionRequest);
- Assert.assertEquals(updateConnectionResponse.getCode(),
EventBridgeErrorCode.Success.getCode());
+ final Mono<UpdateConnectionResponse> updateConnectionResponse =
connectionController.updateConnection(
+ updateConnectionRequest);
+ Assert.assertEquals(updateConnectionResponse.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
}
@Test
public void testGetConnection() {
Set<ConstraintViolation<GetConnectionRequest>> constraintViolations =
new HashSet<>();
-
Mockito.when(validator.validate(any(GetConnectionRequest.class))).thenReturn(constraintViolations);
+ Mockito.when(validator.validate(any(GetConnectionRequest.class)))
+ .thenReturn(constraintViolations);
final ConnectionDTO connectionDTO = new ConnectionDTO();
NetworkParameters networkParameters = new NetworkParameters();
networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType());
- networkParameters.setSecurityGroupId(UUID.randomUUID().toString());
- networkParameters.setVpcId(UUID.randomUUID().toString());
- networkParameters.setVswitcheId(UUID.randomUUID().toString());
+ networkParameters.setSecurityGroupId(UUID.randomUUID()
+ .toString());
+ networkParameters.setVpcId(UUID.randomUUID()
+ .toString());
+ networkParameters.setVswitcheId(UUID.randomUUID()
+ .toString());
connectionDTO.setNetworkParameters(networkParameters);
AuthParameters authParameters = new AuthParameters();
BasicAuthParameters basicAuthParameters = new BasicAuthParameters();
- basicAuthParameters.setPassword(UUID.randomUUID().toString());
- basicAuthParameters.setUsername(UUID.randomUUID().toString());
+ basicAuthParameters.setPassword(UUID.randomUUID()
+ .toString());
+ basicAuthParameters.setUsername(UUID.randomUUID()
+ .toString());
authParameters.setBasicAuthParameters(basicAuthParameters);
authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType());
connectionDTO.setAuthParameters(authParameters);
- BDDMockito.given(connectionService.getConnection(any(),
any())).willReturn(connectionDTO);
+ BDDMockito.given(connectionService.getConnection(any(), any()))
+ .willReturn(connectionDTO);
GetConnectionRequest getConnectionRequest = new GetConnectionRequest();
- getConnectionRequest.setConnectionName(UUID.randomUUID().toString());
- final GetConnectionResponse getConnectionResponse =
connectionController.getConnection(getConnectionRequest);
- Assert.assertEquals(getConnectionResponse.getCode(),
EventBridgeErrorCode.Success.getCode());
+ getConnectionRequest.setConnectionName(UUID.randomUUID()
+ .toString());
+ final Mono<GetConnectionResponse> getConnectionResponse =
connectionController.getConnection(
+ getConnectionRequest);
+ Assert.assertEquals(getConnectionResponse.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
}
@Test
@@ -168,25 +211,33 @@ public class ConnectionControllerTest {
PaginationResult<List<ConnectionDTO>> result = new PaginationResult();
List<ConnectionDTO> eventConnectionWithBLOBs = Lists.newArrayList();
ConnectionDTO eventConnection = new ConnectionDTO();
- eventConnection.setConnectionName(UUID.randomUUID().toString());
+ eventConnection.setConnectionName(UUID.randomUUID()
+ .toString());
eventConnectionWithBLOBs.add(eventConnection);
result.setData(eventConnectionWithBLOBs);
result.setTotal(9);
result.setNextToken("0");
- Mockito.when(connectionService.listConnections(any(), any(), any(),
anyInt())).thenReturn(result);
+ Mockito.when(connectionService.listConnections(any(), any(), any(),
anyInt()))
+ .thenReturn(result);
Set<ConstraintViolation<ListConnectionRequest>> constraintViolations =
new HashSet<>();
-
Mockito.when(validator.validate(any(ListConnectionRequest.class))).thenReturn(constraintViolations);
+ Mockito.when(validator.validate(any(ListConnectionRequest.class)))
+ .thenReturn(constraintViolations);
ListConnectionRequest listConnectionRequest = new
ListConnectionRequest();
-
listConnectionRequest.setConnectionNamePrefix(UUID.randomUUID().toString());
+ listConnectionRequest.setConnectionNamePrefix(UUID.randomUUID()
+ .toString());
listConnectionRequest.setNextToken("0");
listConnectionRequest.setMaxResults(10);
- final ListConnectionResponse listConnectionResponse =
connectionController.listConnections(listConnectionRequest);
- Assert.assertEquals(listConnectionResponse.getCode(),
EventBridgeErrorCode.Success.getCode());
+ final Mono<ListConnectionResponse> listConnections =
connectionController.listConnections(
+ listConnectionRequest);
+ Assert.assertEquals(listConnections.block()
+ .getCode(), EventBridgeErrorCode.Success.getCode());
}
@Test
public void testListEnumsResponse() {
- final ListEnumsResponse listEnumsResponse =
connectionController.listEnumsResponse();
- Assert.assertEquals(listEnumsResponse.getNetworkTypeEnums().size(),
NetworkTypeEnum.values().length);
+ final Mono<ListEnumsResponse> listEnumsResponse =
connectionController.listEnumsResponse();
+ Assert.assertEquals(listEnumsResponse.block()
+ .getNetworkTypeEnums()
+ .size(), NetworkTypeEnum.values().length);
}
}
diff --git
a/adapter/persistence/src/main/resources/db/migration/V3__register_source_acs_mns.sql
b/adapter/persistence/src/main/resources/db/migration/V4__register_source_acs_mns.sql
similarity index 100%
rename from
adapter/persistence/src/main/resources/db/migration/V3__register_source_acs_mns.sql
rename to
adapter/persistence/src/main/resources/db/migration/V4__register_source_acs_mns.sql
diff --git
a/adapter/persistence/src/main/resources/db/migration/V4__register_target_acs_eventbridge.sql
b/adapter/persistence/src/main/resources/db/migration/V6__register_target_acs_eventbridge.sql
similarity index 100%
rename from
adapter/persistence/src/main/resources/db/migration/V4__register_target_acs_eventbridge.sql
rename to
adapter/persistence/src/main/resources/db/migration/V6__register_target_acs_eventbridge.sql
diff --git a/adapter/rpc/pom.xml b/adapter/rpc/pom.xml
index 25390fe..e717ede 100644
--- a/adapter/rpc/pom.xml
+++ b/adapter/rpc/pom.xml
@@ -14,6 +14,7 @@
<properties>
<rocketmq.version>4.9.2</rocketmq.version>
<httpclient.version>4.5.13</httpclient.version>
+ <reactor.version>3.1.7.RELEASE</reactor.version>
</properties>
<version>1.0.0-SNAPSHOT</version>
@@ -47,5 +48,10 @@
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ <version>${reactor.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/AccountAPIImpl.java
b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/AccountAPIImpl.java
index b4a28e6..a8c87fa 100644
---
a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/AccountAPIImpl.java
+++
b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/AccountAPIImpl.java
@@ -19,30 +19,26 @@ package org.apache.rocketmq.eventbridge.adapter.rpc.impl;
import org.apache.rocketmq.eventbridge.domain.rpc.AccountAPI;
import org.springframework.stereotype.Component;
+import reactor.util.context.Context;
@Component
-public class AccountAPIImpl implements AccountAPI {
+public class AccountAPIImpl implements AccountAPI<Context> {
public static final String HEADER_KEY_LOGIN_ACCOUNT_ID = "loginAccountId";
public static final String HEADER_KEY_PARENT_ACCOUNT_ID =
"parentAccountId";
public static final String HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID =
"resourceOwnerAccountId";
@Override
- public String getLoginAccountId() {
- return null;
+ public String getLoginAccountId(Context ctx) {
+ return ctx.get(HEADER_KEY_LOGIN_ACCOUNT_ID);
}
@Override
- public String getParentAccountId() {
- return null;
+ public String getParentAccountId(Context ctx) {
+ return ctx.get(HEADER_KEY_PARENT_ACCOUNT_ID);
}
@Override
- public String getResourceOwnerAccountId() {
- return "Admin";
- }
-
- @Override
- public boolean isParentAccount() {
- return false;
+ public String getResourceOwnerAccountId(Context ctx) {
+ return ctx.get(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID);
}
}
diff --git
a/common/src/main/java/org/apache/rocketmq/eventbridge/exception/code/DefaultErrorCode.java
b/common/src/main/java/org/apache/rocketmq/eventbridge/exception/code/DefaultErrorCode.java
index 5d66444..7f8472e 100644
---
a/common/src/main/java/org/apache/rocketmq/eventbridge/exception/code/DefaultErrorCode.java
+++
b/common/src/main/java/org/apache/rocketmq/eventbridge/exception/code/DefaultErrorCode.java
@@ -21,6 +21,7 @@ public enum DefaultErrorCode implements BaseErrorCode {
//Default
Success(200, "Success", "success"),
InternalError(500, "InternalError", "InternalError"),
+ LoginFailed(409, "LoginFailed", "Login failed."),
;
private final int httpCode;
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/rpc/AccountAPI.java
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/rpc/AccountAPI.java
index b19639e..c919aa7 100644
---
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/rpc/AccountAPI.java
+++
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/rpc/AccountAPI.java
@@ -17,13 +17,32 @@
package org.apache.rocketmq.eventbridge.domain.rpc;
-public interface AccountAPI {
+public interface AccountAPI<T> {
- String getLoginAccountId();
+ /**
+ * Return the login accountId.
+ *
+ * @param ctx
+ *
+ * @return
+ */
+ String getLoginAccountId(T ctx);
- String getParentAccountId();
+ /**
+ * Return the parent of login accountId.
+ *
+ * @param ctx
+ *
+ * @return
+ */
+ String getParentAccountId(T ctx);
- String getResourceOwnerAccountId();
-
- boolean isParentAccount();
+ /**
+ * Return the resource owner's accountId.
+ *
+ * @param ctx
+ *
+ * @return
+ */
+ String getResourceOwnerAccountId(T ctx);
}
diff --git
a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/LoginFilter.java
b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/LoginFilter.java
index 261504b..c045270 100644
---
a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/LoginFilter.java
+++
b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/LoginFilter.java
@@ -16,6 +16,11 @@
*/
package org.apache.rocketmq.eventbridge.filter;
+ import java.util.List;
+
+ import lombok.extern.slf4j.Slf4j;
+ import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+ import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode;
import org.springframework.core.annotation.Order;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
@@ -26,17 +31,33 @@
@Component
@Order(value = 2)
+ @Slf4j
public class LoginFilter implements WebFilter {
+ public static final String HEADER_KEY_LOGIN_ACCOUNT_ID = "loginAccountId";
+ public static final String HEADER_KEY_PARENT_ACCOUNT_ID =
"parentAccountId";
+ public static final String HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID =
"resourceOwnerAccountId";
+
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain
chain) {
ServerHttpRequest request = exchange.getRequest();
- //request.getBody().flatMap(dataBuffer -> {
- // Mono<Void> result = Mono.create(monoSink -> {
- // byte[] bytes = dataBuffer.asByteBuffer().array();
- // });
- // return result;
- //});
- return chain.filter(exchange);
+ return chain.filter(exchange)
+ .subscriberContext(ctx -> {
+ List<String> parentAccountIds = request.getHeaders()
+ .get(HEADER_KEY_PARENT_ACCOUNT_ID);
+ List<String> loginAccountIds = request.getHeaders()
+ .get(HEADER_KEY_LOGIN_ACCOUNT_ID);
+ List<String> resourceOwnerIds = request.getHeaders()
+ .get(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID);
+ if(resourceOwnerIds == null || resourceOwnerIds.isEmpty()){
+ throw new
EventBridgeException(DefaultErrorCode.LoginFailed);
+ }
+ return ctx.put(HEADER_KEY_PARENT_ACCOUNT_ID,
+ parentAccountIds != null && !parentAccountIds.isEmpty() ?
parentAccountIds.get(0) : "")
+ .put(HEADER_KEY_LOGIN_ACCOUNT_ID,
+ loginAccountIds != null && !loginAccountIds.isEmpty()
? loginAccountIds.get(0) : "")
+ .put(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID,
+ resourceOwnerIds != null &&
!resourceOwnerIds.isEmpty() ? resourceOwnerIds.get(0) : "");
+ });
}
}