This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


The following commit(s) were added to refs/heads/main by this push:
     new db61a46  feat:Modify API to Mono structure (#27)
db61a46 is described below

commit db61a464321c507ce8eb960029fbaf0445e22fa5
Author: shenlin <[email protected]>
AuthorDate: Tue Sep 6 10:31:53 2022 +0800

    feat:Modify API to Mono structure (#27)
    
    feat:Modify API to Mono structure
---
 .../api/controller/ApiDestinationController.java   | 162 ++++++++++++-------
 .../api/controller/ConnectionController.java       | 176 +++++++++++++--------
 .../adapter/api/controller/EventBusController.java |  65 +++++---
 .../api/controller/EventDataController.java        |  29 ++--
 .../api/controller/EventRuleController.java        | 139 +++++++++-------
 .../api/controller/EventSourceController.java      | 120 ++++++++------
 .../api/controller/EventTargetController.java      |  74 +++++----
 .../api/controller/EventTypeController.java        |  42 ++---
 .../ApiDestinationDTOControllerTest.java           | 102 ++++++++----
 .../api/controller/ConnectionControllerTest.java   | 143 +++++++++++------
 ...acs_mns.sql => V4__register_source_acs_mns.sql} |   0
 ...sql => V6__register_target_acs_eventbridge.sql} |   0
 adapter/rpc/pom.xml                                |   6 +
 .../adapter/rpc/impl/AccountAPIImpl.java           |  20 +--
 .../exception/code/DefaultErrorCode.java           |   1 +
 .../eventbridge/domain/rpc/AccountAPI.java         |  31 +++-
 .../rocketmq/eventbridge/filter/LoginFilter.java   |  35 +++-
 17 files changed, 728 insertions(+), 417 deletions(-)

diff --git 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationController.java
 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationController.java
index a5d32a9..f18e6db 100644
--- 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationController.java
+++ 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationController.java
@@ -41,6 +41,8 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
+import reactor.util.context.Context;
 
 import javax.annotation.Resource;
 import javax.validation.ConstraintViolation;
@@ -62,83 +64,137 @@ public class ApiDestinationController {
 
     @WebLog
     @PostMapping("createApiDestination")
-    public CreateApiDestinationResponse createApiDestination(@RequestBody 
CreateApiDestinationRequest createApiDestinationRequest) {
-        final Set<ConstraintViolation<CreateApiDestinationRequest>> validate = 
validator.validate(createApiDestinationRequest);
-        List<String> errMessage = 
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
-        if (!CollectionUtils.isEmpty(errMessage)) {
-            return new 
CreateApiDestinationResponse(null).parameterCheckFailRes(errMessage.toString());
-        }
-        ApiDestinationDTO apiDestinationDTO = 
getEventApiDestination(createApiDestinationRequest.getHttpApiParameters(), 
createApiDestinationRequest.getDescription(), 
createApiDestinationRequest.getConnectionName(), 
createApiDestinationRequest.getInvocationRateLimitPerSecond(), 
createApiDestinationRequest.getApiDestinationName(), accountAPI);
-        return new 
CreateApiDestinationResponse(apiDestinationService.createApiDestination(apiDestinationDTO)).success();
+    public Mono<CreateApiDestinationResponse> createApiDestination(
+        @RequestBody CreateApiDestinationRequest createApiDestinationRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                final Set<ConstraintViolation<CreateApiDestinationRequest>> 
validate = validator.validate(
+                    createApiDestinationRequest);
+                List<String> errMessage = validate.stream()
+                    .map(ConstraintViolation::getMessage)
+                    .collect(Collectors.toList());
+                if (!CollectionUtils.isEmpty(errMessage)) {
+                    return new 
CreateApiDestinationResponse(null).parameterCheckFailRes(errMessage.toString());
+                }
+                ApiDestinationDTO apiDestinationDTO = getEventApiDestination(
+                    createApiDestinationRequest.getHttpApiParameters(), 
createApiDestinationRequest.getDescription(),
+                    createApiDestinationRequest.getConnectionName(),
+                    
createApiDestinationRequest.getInvocationRateLimitPerSecond(),
+                    createApiDestinationRequest.getApiDestinationName(), 
accountAPI, ctx);
+                return new CreateApiDestinationResponse(
+                    
apiDestinationService.createApiDestination(apiDestinationDTO)).success();
+            });
     }
 
     @WebLog
     @PostMapping("updateApiDestination")
-    public UpdateApiDestinationResponse updateApiDestination(@RequestBody 
UpdateApiDestinationRequest updateApiDestinationRequest) {
-        final Set<ConstraintViolation<UpdateApiDestinationRequest>> validate = 
validator.validate(updateApiDestinationRequest);
-        List<String> errMessage = 
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
-        if (!CollectionUtils.isEmpty(errMessage)) {
-            return new 
UpdateApiDestinationResponse().parameterCheckFailRes(errMessage.toString());
-        }
-        ApiDestinationDTO apiDestinationDTO = 
getEventApiDestination(updateApiDestinationRequest.getHttpApiParameters(), 
updateApiDestinationRequest.getDescription(), 
updateApiDestinationRequest.getConnectionName(), 
updateApiDestinationRequest.getInvocationRateLimitPerSecond(), 
updateApiDestinationRequest.getApiDestinationName(), accountAPI);
-        apiDestinationService.updateApiDestination(apiDestinationDTO);
-        return new UpdateApiDestinationResponse().success();
+    public Mono<UpdateApiDestinationResponse> updateApiDestination(
+        @RequestBody UpdateApiDestinationRequest updateApiDestinationRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                final Set<ConstraintViolation<UpdateApiDestinationRequest>> 
validate = validator.validate(
+                    updateApiDestinationRequest);
+                List<String> errMessage = validate.stream()
+                    .map(ConstraintViolation::getMessage)
+                    .collect(Collectors.toList());
+                if (!CollectionUtils.isEmpty(errMessage)) {
+                    return new 
UpdateApiDestinationResponse().parameterCheckFailRes(errMessage.toString());
+                }
+                ApiDestinationDTO apiDestinationDTO = getEventApiDestination(
+                    updateApiDestinationRequest.getHttpApiParameters(), 
updateApiDestinationRequest.getDescription(),
+                    updateApiDestinationRequest.getConnectionName(),
+                    
updateApiDestinationRequest.getInvocationRateLimitPerSecond(),
+                    updateApiDestinationRequest.getApiDestinationName(), 
accountAPI, ctx);
+                apiDestinationService.updateApiDestination(apiDestinationDTO);
+                return new UpdateApiDestinationResponse().success();
+            });
     }
 
     @WebLog
     @PostMapping("getApiDestination")
-    public GetApiDestinationResponse getApiDestination(@RequestBody 
GetApiDestinationRequest getApiDestinationRequest) {
-        final Set<ConstraintViolation<GetApiDestinationRequest>> validate = 
validator.validate(getApiDestinationRequest);
-        List<String> errMessage = 
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
-        if (!CollectionUtils.isEmpty(errMessage)) {
-            return new GetApiDestinationResponse(null, null, null, null, 
null).parameterCheckFailRes(errMessage.toString());
-        }
-        final ApiDestinationDTO apiDestinationDTO = 
apiDestinationService.getApiDestination(accountAPI.getResourceOwnerAccountId(), 
getApiDestinationRequest.getApiDestinationName());
-        return new GetApiDestinationResponse(apiDestinationDTO.getName(), 
apiDestinationDTO.getConnectionName(), apiDestinationDTO.getDescription(), 
apiDestinationDTO.getApiParams(), 
apiDestinationDTO.getInvocationRateLimitPerSecond()).success();
+    public Mono<GetApiDestinationResponse> getApiDestination(
+        @RequestBody GetApiDestinationRequest getApiDestinationRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                final Set<ConstraintViolation<GetApiDestinationRequest>> 
validate = validator.validate(
+                    getApiDestinationRequest);
+                List<String> errMessage = validate.stream()
+                    .map(ConstraintViolation::getMessage)
+                    .collect(Collectors.toList());
+                if (!CollectionUtils.isEmpty(errMessage)) {
+                    return new GetApiDestinationResponse(null, null, null, 
null, null).parameterCheckFailRes(
+                        errMessage.toString());
+                }
+                final ApiDestinationDTO apiDestinationDTO = 
apiDestinationService.getApiDestination(
+                    accountAPI.getResourceOwnerAccountId(ctx), 
getApiDestinationRequest.getApiDestinationName());
+                return new 
GetApiDestinationResponse(apiDestinationDTO.getName(), 
apiDestinationDTO.getConnectionName(),
+                    apiDestinationDTO.getDescription(), 
apiDestinationDTO.getApiParams(),
+                    
apiDestinationDTO.getInvocationRateLimitPerSecond()).success();
+            });
     }
 
     @WebLog
     @PostMapping("deleteApiDestination")
-    public DeleteApiDestinationResponse deleteApiDestination(@RequestBody 
DeleteApiDestinationRequest deleteApiDestinationRequest) {
-        final Set<ConstraintViolation<DeleteApiDestinationRequest>> validate = 
validator.validate(deleteApiDestinationRequest);
-        List<String> errMessage = 
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
-        if (!CollectionUtils.isEmpty(errMessage)) {
-            return new 
DeleteApiDestinationResponse().parameterCheckFailRes(errMessage.toString());
-        }
-        
apiDestinationService.deleteApiDestination(accountAPI.getResourceOwnerAccountId(),
 deleteApiDestinationRequest.getApiDestinationName());
-        return new DeleteApiDestinationResponse().success();
+    public Mono<DeleteApiDestinationResponse> deleteApiDestination(
+        @RequestBody DeleteApiDestinationRequest deleteApiDestinationRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                final Set<ConstraintViolation<DeleteApiDestinationRequest>> 
validate = validator.validate(
+                    deleteApiDestinationRequest);
+                List<String> errMessage = validate.stream()
+                    .map(ConstraintViolation::getMessage)
+                    .collect(Collectors.toList());
+                if (!CollectionUtils.isEmpty(errMessage)) {
+                    return new 
DeleteApiDestinationResponse().parameterCheckFailRes(errMessage.toString());
+                }
+                
apiDestinationService.deleteApiDestination(accountAPI.getResourceOwnerAccountId(ctx),
+                    deleteApiDestinationRequest.getApiDestinationName());
+                return new DeleteApiDestinationResponse().success();
+            });
     }
 
     @WebLog
     @PostMapping("listApiDestinations")
-    public ListApiDestinationsResponse listApiDestinations(@RequestBody 
ListApiDestinationsRequest listApiDestinationsRequest) {
-        final Set<ConstraintViolation<ListApiDestinationsRequest>> validate = 
validator.validate(listApiDestinationsRequest);
-        List<String> errMessage = 
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
-        if (!CollectionUtils.isEmpty(errMessage)) {
-            return new ListApiDestinationsResponse(null, null, null, 
0).parameterCheckFailRes(errMessage.toString());
-        }
-        final PaginationResult<List<ApiDestinationDTO>> listPaginationResult = 
apiDestinationService.listApiDestinations(accountAPI.getResourceOwnerAccountId(),
-                listApiDestinationsRequest.getApiDestinationNamePrefix(), 
listApiDestinationsRequest.getNextToken(), 
listApiDestinationsRequest.getMaxResults());
-        List<ApiDestinationsResponse> apiDestinationsResponses = 
Lists.newArrayList();
-        listPaginationResult.getData()
-                .forEach(eventApiDestination -> {
-                    ApiDestinationsResponse apiDestinationsResponse = new 
ApiDestinationsResponse();
-                    BeanUtils.copyProperties(eventApiDestination, 
apiDestinationsResponse);
-                    
apiDestinationsResponse.setApiDestinationName(eventApiDestination.getName());
-                    
apiDestinationsResponse.setHttpApiParameters(eventApiDestination.getApiParams());
-                    apiDestinationsResponses.add(apiDestinationsResponse);
-                });
-        return new ListApiDestinationsResponse(apiDestinationsResponses, 
listPaginationResult.getNextToken(), listPaginationResult.getTotal(), 
listApiDestinationsRequest.getMaxResults()).success();
+    public Mono<ListApiDestinationsResponse> listApiDestinations(
+        @RequestBody ListApiDestinationsRequest listApiDestinationsRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                final Set<ConstraintViolation<ListApiDestinationsRequest>> 
validate = validator.validate(
+                    listApiDestinationsRequest);
+                List<String> errMessage = validate.stream()
+                    .map(ConstraintViolation::getMessage)
+                    .collect(Collectors.toList());
+                if (!CollectionUtils.isEmpty(errMessage)) {
+                    return new ListApiDestinationsResponse(null, null, null, 
0).parameterCheckFailRes(
+                        errMessage.toString());
+                }
+                final PaginationResult<List<ApiDestinationDTO>> 
listPaginationResult
+                    = 
apiDestinationService.listApiDestinations(accountAPI.getResourceOwnerAccountId(ctx),
+                    listApiDestinationsRequest.getApiDestinationNamePrefix(), 
listApiDestinationsRequest.getNextToken(),
+                    listApiDestinationsRequest.getMaxResults());
+                List<ApiDestinationsResponse> apiDestinationsResponses = 
Lists.newArrayList();
+                listPaginationResult.getData()
+                    .forEach(eventApiDestination -> {
+                        ApiDestinationsResponse apiDestinationsResponse = new 
ApiDestinationsResponse();
+                        BeanUtils.copyProperties(eventApiDestination, 
apiDestinationsResponse);
+                        
apiDestinationsResponse.setApiDestinationName(eventApiDestination.getName());
+                        
apiDestinationsResponse.setHttpApiParameters(eventApiDestination.getApiParams());
+                        apiDestinationsResponses.add(apiDestinationsResponse);
+                    });
+                return new 
ListApiDestinationsResponse(apiDestinationsResponses, 
listPaginationResult.getNextToken(),
+                    listPaginationResult.getTotal(), 
listApiDestinationsRequest.getMaxResults()).success();
+            });
     }
 
-    private ApiDestinationDTO getEventApiDestination(HttpApiParameters 
apiParams, String description, String connectionName, Integer 
invocationRateLimitPerSecond, String name, AccountAPI accountAPI) {
+    private ApiDestinationDTO getEventApiDestination(HttpApiParameters 
apiParams, String description,
+        String connectionName, Integer invocationRateLimitPerSecond, String 
name, AccountAPI accountAPI, Context ctx) {
         ApiDestinationDTO apiDestinationDTO = new ApiDestinationDTO();
         apiDestinationDTO.setApiParams(apiParams);
         apiDestinationDTO.setDescription(description);
         apiDestinationDTO.setConnectionName(connectionName);
         
apiDestinationDTO.setInvocationRateLimitPerSecond(invocationRateLimitPerSecond);
         apiDestinationDTO.setName(name);
-        apiDestinationDTO.setAccountId(accountAPI.getResourceOwnerAccountId());
+        
apiDestinationDTO.setAccountId(accountAPI.getResourceOwnerAccountId(ctx));
         return apiDestinationDTO;
     }
 }
diff --git 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionController.java
 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionController.java
index 357b2df..63be555 100644
--- 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionController.java
+++ 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionController.java
@@ -17,6 +17,14 @@
 
 package org.apache.rocketmq.eventbridge.adapter.api.controller;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Resource;
+import javax.validation.ConstraintViolation;
+import javax.validation.Validator;
+
 import com.google.common.collect.Lists;
 import org.apache.rocketmq.eventbridge.adapter.api.annotations.WebLog;
 import org.apache.rocketmq.eventbridge.adapter.api.dto.BaseRequest;
@@ -35,8 +43,8 @@ import 
org.apache.rocketmq.eventbridge.adapter.api.dto.connection.UpdateConnecti
 import 
org.apache.rocketmq.eventbridge.domain.common.enums.AuthorizationTypeEnum;
 import org.apache.rocketmq.eventbridge.domain.common.enums.NetworkTypeEnum;
 import org.apache.rocketmq.eventbridge.domain.model.PaginationResult;
-import 
org.apache.rocketmq.eventbridge.domain.model.connection.ConnectionService;
 import org.apache.rocketmq.eventbridge.domain.model.connection.ConnectionDTO;
+import 
org.apache.rocketmq.eventbridge.domain.model.connection.ConnectionService;
 import org.apache.rocketmq.eventbridge.domain.rpc.AccountAPI;
 import org.springframework.beans.BeanUtils;
 import org.springframework.util.CollectionUtils;
@@ -44,14 +52,8 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
-
-import javax.annotation.Resource;
-import javax.validation.ConstraintViolation;
-import javax.validation.Validator;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
+import reactor.core.publisher.Mono;
+import reactor.util.context.Context;
 
 @RestController
 @RequestMapping("/connection/")
@@ -66,85 +68,129 @@ public class ConnectionController {
 
     @WebLog
     @PostMapping("createConnection")
-    public CreateConnectionResponse createConnection(@RequestBody 
CreateConnectionRequest createConnectionRequest) {
-        final Set<ConstraintViolation<CreateConnectionRequest>> validate = 
validator.validate(createConnectionRequest);
-        List<String> errMessage = 
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
-        if (!CollectionUtils.isEmpty(errMessage)) {
-            return new 
CreateConnectionResponse(null).parameterCheckFailRes(errMessage.toString());
-        }
-        ConnectionDTO connectionDTO = 
getEventConnectionWithBLOBs(createConnectionRequest);
-        return new 
CreateConnectionResponse(connectionService.createConnection(connectionDTO)).success();
+    public Mono<CreateConnectionResponse> createConnection(
+        @RequestBody CreateConnectionRequest createConnectionRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                final Set<ConstraintViolation<CreateConnectionRequest>> 
validate = validator.validate(
+                    createConnectionRequest);
+                List<String> errMessage = validate.stream()
+                    .map(ConstraintViolation::getMessage)
+                    .collect(Collectors.toList());
+                if (!CollectionUtils.isEmpty(errMessage)) {
+                    return new 
CreateConnectionResponse(null).parameterCheckFailRes(errMessage.toString());
+                }
+                ConnectionDTO connectionDTO = getEventConnectionWithBLOBs(ctx, 
createConnectionRequest);
+                return new 
CreateConnectionResponse(connectionService.createConnection(connectionDTO)).success();
+            });
     }
 
     @WebLog
     @PostMapping("deleteConnection")
-    public DeleteConnectionResponse deleteConnection(@RequestBody 
DeleteConnectionRequest deleteConnectionRequest) {
-        final Set<ConstraintViolation<DeleteConnectionRequest>> validate = 
validator.validate(deleteConnectionRequest);
-        List<String> errMessage = 
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
-        if (!CollectionUtils.isEmpty(errMessage)) {
-            return new 
DeleteConnectionResponse().parameterCheckFailRes(errMessage.toString());
-        }
-        
connectionService.deleteConnection(accountAPI.getResourceOwnerAccountId(), 
deleteConnectionRequest.getConnectionName());
-        return new DeleteConnectionResponse().success();
+    public Mono<DeleteConnectionResponse> deleteConnection(
+        @RequestBody DeleteConnectionRequest deleteConnectionRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                final Set<ConstraintViolation<DeleteConnectionRequest>> 
validate = validator.validate(
+                    deleteConnectionRequest);
+                List<String> errMessage = validate.stream()
+                    .map(ConstraintViolation::getMessage)
+                    .collect(Collectors.toList());
+                if (!CollectionUtils.isEmpty(errMessage)) {
+                    return new 
DeleteConnectionResponse().parameterCheckFailRes(errMessage.toString());
+                }
+                
connectionService.deleteConnection(accountAPI.getResourceOwnerAccountId(ctx),
+                    deleteConnectionRequest.getConnectionName());
+                return new DeleteConnectionResponse().success();
+            });
     }
 
     @WebLog
     @PostMapping("updateConnection")
-    public UpdateConnectionResponse updateConnection(@RequestBody 
UpdateConnectionRequest updateConnectionRequest) {
-        final Set<ConstraintViolation<UpdateConnectionRequest>> validate = 
validator.validate(updateConnectionRequest);
-        List<String> errMessage = 
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
-        if (!CollectionUtils.isEmpty(errMessage)) {
-            return new 
UpdateConnectionResponse().parameterCheckFailRes(errMessage.toString());
-        }
-        ConnectionDTO connectionDTO = 
getEventConnectionWithBLOBs(updateConnectionRequest);
-        connectionService.updateConnection(connectionDTO, 
accountAPI.getResourceOwnerAccountId());
-        return new UpdateConnectionResponse().success();
+    public Mono<UpdateConnectionResponse> updateConnection(
+        @RequestBody UpdateConnectionRequest updateConnectionRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                final Set<ConstraintViolation<UpdateConnectionRequest>> 
validate = validator.validate(
+                    updateConnectionRequest);
+                List<String> errMessage = validate.stream()
+                    .map(ConstraintViolation::getMessage)
+                    .collect(Collectors.toList());
+                if (!CollectionUtils.isEmpty(errMessage)) {
+                    return new 
UpdateConnectionResponse().parameterCheckFailRes(errMessage.toString());
+                }
+                ConnectionDTO connectionDTO = getEventConnectionWithBLOBs(ctx, 
updateConnectionRequest);
+                connectionService.updateConnection(connectionDTO, 
accountAPI.getResourceOwnerAccountId(ctx));
+                return new UpdateConnectionResponse().success();
+            });
     }
 
     @WebLog
     @PostMapping("getConnection")
-    public GetConnectionResponse getConnection(@RequestBody 
GetConnectionRequest getConnectionRequest) {
-        final Set<ConstraintViolation<GetConnectionRequest>> validate = 
validator.validate(getConnectionRequest);
-        List<String> errMessage = 
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
-        if (!CollectionUtils.isEmpty(errMessage)) {
-            return new GetConnectionResponse(null, null, null, 
null).parameterCheckFailRes(errMessage.toString());
-        }
-        final ConnectionDTO connectionDTO = 
connectionService.getConnection(accountAPI.getResourceOwnerAccountId(), 
getConnectionRequest.getConnectionName());
-        return new GetConnectionResponse(connectionDTO.getConnectionName(), 
connectionDTO.getDescription(), connectionDTO.getNetworkParameters(), 
connectionDTO.getAuthParameters()).success();
+    public Mono<GetConnectionResponse> getConnection(@RequestBody 
GetConnectionRequest getConnectionRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                final Set<ConstraintViolation<GetConnectionRequest>> validate 
= validator.validate(
+                    getConnectionRequest);
+                List<String> errMessage = validate.stream()
+                    .map(ConstraintViolation::getMessage)
+                    .collect(Collectors.toList());
+                if (!CollectionUtils.isEmpty(errMessage)) {
+                    return new GetConnectionResponse(null, null, null, 
null).parameterCheckFailRes(
+                        errMessage.toString());
+                }
+                final ConnectionDTO connectionDTO = 
connectionService.getConnection(
+                    accountAPI.getResourceOwnerAccountId(ctx), 
getConnectionRequest.getConnectionName());
+                return new 
GetConnectionResponse(connectionDTO.getConnectionName(), 
connectionDTO.getDescription(),
+                    connectionDTO.getNetworkParameters(), 
connectionDTO.getAuthParameters()).success();
+            });
     }
 
     @WebLog
     @PostMapping("listConnections")
-    public ListConnectionResponse listConnections(@RequestBody 
ListConnectionRequest listConnectionRequest) {
-        final Set<ConstraintViolation<ListConnectionRequest>> validate = 
validator.validate(listConnectionRequest);
-        List<String> errMessage = 
validate.stream().map(ConstraintViolation::getMessage).collect(Collectors.toList());
-        if (!CollectionUtils.isEmpty(errMessage)) {
-            return new ListConnectionResponse(null, null, null, 
0).parameterCheckFailRes(errMessage.toString());
-        }
-        final PaginationResult<List<ConnectionDTO>> listPaginationResult = 
connectionService.listConnections(accountAPI.getResourceOwnerAccountId(),
-                listConnectionRequest.getConnectionNamePrefix(), 
listConnectionRequest.getNextToken(), listConnectionRequest.getMaxResults());
-        List<ConnectionResponse> connectionResponses = Lists.newArrayList();
-        listPaginationResult.getData()
-                .forEach(connectionDTO -> {
-                    ConnectionResponse connectionResponse = new 
ConnectionResponse();
-                    BeanUtils.copyProperties(connectionDTO, 
connectionResponse);
-                    connectionResponses.add(connectionResponse);
-                });
-        return new ListConnectionResponse(connectionResponses, 
listPaginationResult.getNextToken(), listPaginationResult.getTotal(), 
listConnectionRequest.getMaxResults()).success();
+    public Mono<ListConnectionResponse> listConnections(@RequestBody 
ListConnectionRequest listConnectionRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                final Set<ConstraintViolation<ListConnectionRequest>> validate 
= validator.validate(
+                    listConnectionRequest);
+                List<String> errMessage = validate.stream()
+                    .map(ConstraintViolation::getMessage)
+                    .collect(Collectors.toList());
+                if (!CollectionUtils.isEmpty(errMessage)) {
+                    return new ListConnectionResponse(null, null, null, 
0).parameterCheckFailRes(errMessage.toString());
+                }
+                final PaginationResult<List<ConnectionDTO>> 
listPaginationResult = connectionService.listConnections(
+                    accountAPI.getResourceOwnerAccountId(ctx), 
listConnectionRequest.getConnectionNamePrefix(),
+                    listConnectionRequest.getNextToken(), 
listConnectionRequest.getMaxResults());
+                List<ConnectionResponse> connectionResponses = 
Lists.newArrayList();
+                listPaginationResult.getData()
+                    .forEach(connectionDTO -> {
+                        ConnectionResponse connectionResponse = new 
ConnectionResponse();
+                        BeanUtils.copyProperties(connectionDTO, 
connectionResponse);
+                        connectionResponses.add(connectionResponse);
+                    });
+                return new ListConnectionResponse(connectionResponses, 
listPaginationResult.getNextToken(),
+                    listPaginationResult.getTotal(), 
listConnectionRequest.getMaxResults()).success();
+            });
     }
 
     @PostMapping("listEnumsResponse")
-    public ListEnumsResponse listEnumsResponse() {
-        ListEnumsResponse listEnumsResponse = new ListEnumsResponse();
-        
listEnumsResponse.setAuthorizationTypeEnums(Arrays.stream(AuthorizationTypeEnum.values()).collect(Collectors.toList()));
-        
listEnumsResponse.setNetworkTypeEnums(Arrays.stream(NetworkTypeEnum.values()).collect(Collectors.toList()));
-        return listEnumsResponse.success();
+    public Mono<ListEnumsResponse> listEnumsResponse() {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                ListEnumsResponse listEnumsResponse = new ListEnumsResponse();
+                
listEnumsResponse.setAuthorizationTypeEnums(Arrays.stream(AuthorizationTypeEnum.values())
+                    .collect(Collectors.toList()));
+                
listEnumsResponse.setNetworkTypeEnums(Arrays.stream(NetworkTypeEnum.values())
+                    .collect(Collectors.toList()));
+                return listEnumsResponse.success();
+            });
     }
 
-    private ConnectionDTO getEventConnectionWithBLOBs(BaseRequest baseRequest) 
{
+    private ConnectionDTO getEventConnectionWithBLOBs(Context ctx, BaseRequest 
baseRequest) {
         ConnectionDTO connectionDTO = new ConnectionDTO();
         BeanUtils.copyProperties(baseRequest, connectionDTO);
-        connectionDTO.setAccountId(accountAPI.getResourceOwnerAccountId());
+        connectionDTO.setAccountId(accountAPI.getResourceOwnerAccountId(ctx));
         return connectionDTO;
     }
 
diff --git 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java
 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java
index 5424b34..1af1404 100644
--- 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java
+++ 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventBusController.java
@@ -39,6 +39,7 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
 
 @RestController
 @RequestMapping("/bus/")
@@ -54,42 +55,54 @@ public class EventBusController {
     AccountAPI accountAPI;
 
     @PostMapping(value = {"createEventBus"})
-    public CreateEventBusResponse createEventBus(@RequestBody 
CreateEventBusRequest createEventBusRequest) {
-        eventBusService.createEventBus(accountAPI.getResourceOwnerAccountId(), 
createEventBusRequest.getEventBusName(),
-            createEventBusRequest.getDescription());
-        return new 
CreateEventBusResponse(createEventBusRequest.getEventBusName());
+    public Mono<CreateEventBusResponse> createEventBus(@RequestBody 
CreateEventBusRequest createEventBusRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                
eventBusService.createEventBus(accountAPI.getResourceOwnerAccountId(ctx),
+                    createEventBusRequest.getEventBusName(), 
createEventBusRequest.getDescription());
+                return new 
CreateEventBusResponse(createEventBusRequest.getEventBusName());
+            });
     }
 
     @PostMapping(value = {"getEventBus"})
-    public GetEventBusResponse getEventBus(@RequestBody GetEventBusRequest 
getEventBusRequest) {
-        EventBus eventbus = 
eventBusService.getEventBus(accountAPI.getResourceOwnerAccountId(),
-            getEventBusRequest.getEventBusName());
-        return new GetEventBusResponse(eventbus.getName(), 
eventbus.getDescription(), eventbus.getGmtCreate()
-            .getTime());
+    public Mono<GetEventBusResponse> getEventBus(@RequestBody 
GetEventBusRequest getEventBusRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                EventBus eventbus = 
eventBusService.getEventBus(accountAPI.getResourceOwnerAccountId(ctx),
+                    getEventBusRequest.getEventBusName());
+                return new GetEventBusResponse(eventbus.getName(), 
eventbus.getDescription(), eventbus.getGmtCreate()
+                    .getTime());
+            });
     }
 
     @PostMapping(value = {"deleteEventBus"})
-    public DeleteEventBusResponse deleteEventBus(@RequestBody 
DeleteEventBusRequest deleteEventBusRequest) {
-        
eventBusDomainService.deleteEventBusCheckDependencies(accountAPI.getResourceOwnerAccountId(),
-            deleteEventBusRequest.getEventBusName());
-        return new DeleteEventBusResponse();
+    public Mono<DeleteEventBusResponse> deleteEventBus(@RequestBody 
DeleteEventBusRequest deleteEventBusRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                
eventBusDomainService.deleteEventBusCheckDependencies(accountAPI.getResourceOwnerAccountId(ctx),
+                    deleteEventBusRequest.getEventBusName());
+                return new DeleteEventBusResponse();
+            });
     }
 
     @PostMapping(value = {"listEventBuses"})
-    public ListEventBusesResponse listEventBuses(@RequestBody 
ListEventBusesRequest listEventBusesRequest) {
-        PaginationResult<List<EventBus>> paginationResult = 
eventBusService.listEventBuses(
-            accountAPI.getResourceOwnerAccountId(), 
listEventBusesRequest.getNextToken(),
-            listEventBusesRequest.getMaxResults());
-        List<EventBusDTO> eventBuses = Lists.newArrayList();
-        paginationResult.getData()
-            .forEach(eventBus -> {
-                EventBusDTO eventBusDTO = new EventBusDTO();
-                eventBusDTO.setEventBusName(eventBusDTO.getEventBusName());
-                eventBusDTO.setDescription(eventBus.getDescription());
-                eventBuses.add(eventBusDTO);
+    public Mono<ListEventBusesResponse> listEventBuses(@RequestBody 
ListEventBusesRequest listEventBusesRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                PaginationResult<List<EventBus>> paginationResult = 
eventBusService.listEventBuses(
+                    accountAPI.getResourceOwnerAccountId(ctx), 
listEventBusesRequest.getNextToken(),
+                    listEventBusesRequest.getMaxResults());
+                List<EventBusDTO> eventBuses = Lists.newArrayList();
+                paginationResult.getData()
+                    .forEach(eventBus -> {
+                        EventBusDTO eventBusDTO = new EventBusDTO();
+                        
eventBusDTO.setEventBusName(eventBusDTO.getEventBusName());
+                        eventBusDTO.setDescription(eventBus.getDescription());
+                        eventBuses.add(eventBusDTO);
+                    });
+                return new ListEventBusesResponse(eventBuses, 
paginationResult.getNextToken(),
+                    paginationResult.getTotal(), 
listEventBusesRequest.getMaxResults());
             });
-        return new ListEventBusesResponse(eventBuses, 
paginationResult.getNextToken(), paginationResult.getTotal(),
-            listEventBusesRequest.getMaxResults());
     }
 
 }
diff --git 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventDataController.java
 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventDataController.java
index 1c7b7b6..68b1f64 100644
--- 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventDataController.java
+++ 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventDataController.java
@@ -58,21 +58,25 @@ public class EventDataController {
 
     @PostMapping(value = {"putEvents"})
     public Mono<PutEventsResponse> putEvents(@RequestHeader Map<String, 
String> headers, @RequestBody byte[] body) {
-        List<CloudEvent> cloudEvents = 
eventConverterAdapter.toEventsRequest(headers, body);
-        List<EventBridgeEvent> eventList = 
this.converterEventBridgeEvent(cloudEvents);
-        return 
eventDataHandler.putEvents(accountAPI.getResourceOwnerAccountId(), eventList);
+        return Mono.subscriberContext()
+            .flatMap(ctx -> {
+                List<CloudEvent> cloudEvents = 
eventConverterAdapter.toEventsRequest(headers, body);
+                List<EventBridgeEvent> eventList = 
this.converterEventBridgeEvent(cloudEvents);
+                return 
eventDataHandler.putEvents(accountAPI.getResourceOwnerAccountId(ctx), 
eventList);
+            });
     }
 
     @RequestMapping(value = {"webhook/putEvents"})
     public Mono<PutEventsResponse> putHttpEvents(ServerWebExchange 
serverWebExchange,
-                                                 @RequestHeader Map<String, 
String> headers,
-                                                 @RequestBody byte[] body,
-                                                 @RequestParam("token") String 
token) {
-        ServerHttpRequest request = serverWebExchange.getRequest();
-        List<CloudEvent> cloudEvents = 
httpEventConverter.toEventBridgeEvent(request, body,
-                headers, accountAPI.getResourceOwnerAccountId(), token);
-        List<EventBridgeEvent> eventList = 
this.converterEventBridgeEvent(cloudEvents);
-        return 
eventDataHandler.putEvents(accountAPI.getResourceOwnerAccountId(), eventList);
+        @RequestHeader Map<String, String> headers, @RequestBody byte[] body, 
@RequestParam("token") String token) {
+        return Mono.subscriberContext()
+            .flatMap(ctx -> {
+                ServerHttpRequest request = serverWebExchange.getRequest();
+                List<CloudEvent> cloudEvents = 
httpEventConverter.toEventBridgeEvent(request, body, headers,
+                    accountAPI.getResourceOwnerAccountId(ctx), token);
+                List<EventBridgeEvent> eventList = 
this.converterEventBridgeEvent(cloudEvents);
+                return 
eventDataHandler.putEvents(accountAPI.getResourceOwnerAccountId(ctx), 
eventList);
+            });
     }
 
     private List<EventBridgeEvent> converterEventBridgeEvent(List<CloudEvent> 
cloudEvents) {
@@ -91,7 +95,8 @@ public class EventDataController {
                 .dataschema(cloudEvent.getDataSchema())
                 .datacontenttype(cloudEvent.getDataContentType())
                 .time(cloudEvent.getTime())
-                .data(cloudEvent.getData().toBytes())
+                .data(cloudEvent.getData()
+                    .toBytes())
                 .build();
             if (cloudEvent.getExtensionNames() != null) {
                 cloudEvent.getExtensionNames()
diff --git 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventRuleController.java
 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventRuleController.java
index 2244f0f..c09b5f7 100644
--- 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventRuleController.java
+++ 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventRuleController.java
@@ -48,6 +48,7 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
 
 @RestController
 @RequestMapping("/rule/")
@@ -63,83 +64,105 @@ public class EventRuleController {
     AccountAPI accountAPI;
 
     @PostMapping(value = {"createEventRule"})
-    public CreateRuleResponse createRule(@RequestBody CreateRuleRequest 
createRuleRequest) {
-        
eventRuleService.createEventRule(accountAPI.getResourceOwnerAccountId(), 
createRuleRequest.getEventBusName(),
-            createRuleRequest.getEventRuleName(), 
createRuleRequest.getDescription(),
-            createRuleRequest.getFilterPattern());
-        return new CreateRuleResponse(createRuleRequest.getEventRuleName());
+    public Mono<CreateRuleResponse> createRule(@RequestBody CreateRuleRequest 
createRuleRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                
eventRuleService.createEventRule(accountAPI.getResourceOwnerAccountId(ctx),
+                    createRuleRequest.getEventBusName(), 
createRuleRequest.getEventRuleName(),
+                    createRuleRequest.getDescription(), 
createRuleRequest.getFilterPattern());
+                return new 
CreateRuleResponse(createRuleRequest.getEventRuleName());
+            });
     }
 
     @PostMapping(value = {"getEventRule"})
-    public GetRuleResponse getRule(@RequestBody GetRuleRequest getRuleRequest) 
{
-        EventRuleDetail eventRuleDetail = 
eventRuleDomainService.getEventRuleDetail(
-            accountAPI.getResourceOwnerAccountId(), 
getRuleRequest.getEventBusName(),
-            getRuleRequest.getEventRuleName());
-
-        List<EventTargetDTO> eventTargets = 
EventTargetDTOConverter.convert(eventRuleDetail.getEventTargets());
-        GetRuleResponse getRuleResponse = GetRuleResponse.builder()
-            .eventBusName(eventRuleDetail.getEventBusName())
-            .eventRuleName(eventRuleDetail.getName())
-            .description(eventRuleDetail.getDescription())
-            .filterPattern(eventRuleDetail.getFilterPattern())
-            .status(eventRuleDetail.getStatus())
-            .gmtCreate(eventRuleDetail.getGmtCreate())
-            .gmtModify(eventRuleDetail.getGmtModify())
-            .eventTargets(eventTargets)
-            .build();
-        return getRuleResponse;
+    public Mono<GetRuleResponse> getRule(@RequestBody GetRuleRequest 
getRuleRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                EventRuleDetail eventRuleDetail = 
eventRuleDomainService.getEventRuleDetail(
+                    accountAPI.getResourceOwnerAccountId(ctx), 
getRuleRequest.getEventBusName(),
+                    getRuleRequest.getEventRuleName());
+
+                List<EventTargetDTO> eventTargets = 
EventTargetDTOConverter.convert(eventRuleDetail.getEventTargets());
+                GetRuleResponse getRuleResponse = GetRuleResponse.builder()
+                    .eventBusName(eventRuleDetail.getEventBusName())
+                    .eventRuleName(eventRuleDetail.getName())
+                    .description(eventRuleDetail.getDescription())
+                    .filterPattern(eventRuleDetail.getFilterPattern())
+                    .status(eventRuleDetail.getStatus())
+                    .gmtCreate(eventRuleDetail.getGmtCreate())
+                    .gmtModify(eventRuleDetail.getGmtModify())
+                    .eventTargets(eventTargets)
+                    .build();
+                return getRuleResponse;
+            });
     }
 
     @PostMapping(value = {"deleteEventRule"})
-    public DeleteRuleResponse deleteRule(@RequestBody DeleteRuleRequest 
deleteRuleRequest) {
-        
eventRuleDomainService.deleteEventRuleWithDependencies(accountAPI.getResourceOwnerAccountId(),
-            deleteRuleRequest.getEventBusName(), 
deleteRuleRequest.getEventRuleName());
-        return new DeleteRuleResponse();
+    public Mono<DeleteRuleResponse> deleteRule(@RequestBody DeleteRuleRequest 
deleteRuleRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                
eventRuleDomainService.deleteEventRuleWithDependencies(accountAPI.getResourceOwnerAccountId(ctx),
+                    deleteRuleRequest.getEventBusName(), 
deleteRuleRequest.getEventRuleName());
+                return new DeleteRuleResponse();
+            });
     }
 
     @PostMapping(value = {"updateEventRule"})
-    public UpdateRuleResponse updateRule(@RequestBody UpdateRuleRequest 
updateRuleRequest) {
-        
eventRuleDomainService.updateEventRuleWithDependencies(accountAPI.getResourceOwnerAccountId(),
-            updateRuleRequest.getEventBusName(), 
updateRuleRequest.getEventRuleName(),
-            updateRuleRequest.getDescription(), 
updateRuleRequest.getFilterPattern());
-        return new UpdateRuleResponse();
+    public Mono<UpdateRuleResponse> updateRule(@RequestBody UpdateRuleRequest 
updateRuleRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                
eventRuleDomainService.updateEventRuleWithDependencies(accountAPI.getResourceOwnerAccountId(ctx),
+
+                    updateRuleRequest.getEventBusName(), 
updateRuleRequest.getEventRuleName(),
+                    updateRuleRequest.getDescription(), 
updateRuleRequest.getFilterPattern());
+                return new UpdateRuleResponse();
+            });
     }
 
     @PostMapping(value = {"listEventRules"})
-    public ListRulesResponse listRules(@RequestBody ListRulesRequest 
listRulesRequest) {
-        PaginationResult<List<EventRule>> paginationResult = 
eventRuleService.listEventRules(
-            accountAPI.getResourceOwnerAccountId(), 
listRulesRequest.getEventBusName(), listRulesRequest.getNextToken(),
-            listRulesRequest.getMaxResults());
-        List<EventRuleDTO> eventRules = Lists.newArrayList();
-        paginationResult.getData()
-            .forEach(eventRule -> {
-                EventRuleDTO eventRuleDTO = EventRuleDTO.builder()
-                    .eventBusName(eventRule.getEventBusName())
-                    .eventRuleName(eventRule.getName())
-                    .description(eventRule.getDescription())
-                    .filterPattern(eventRule.getFilterPattern())
-                    .status(eventRule.getStatus())
-                    .gmtCreate(eventRule.getGmtCreate())
-                    .gmtModify(eventRule.getGmtModify())
-                    .build();
-                eventRules.add(eventRuleDTO);
+    public Mono<ListRulesResponse> listRules(@RequestBody ListRulesRequest 
listRulesRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                PaginationResult<List<EventRule>> paginationResult = 
eventRuleService.listEventRules(
+                    accountAPI.getResourceOwnerAccountId(ctx), 
listRulesRequest.getEventBusName(),
+                    listRulesRequest.getNextToken(), 
listRulesRequest.getMaxResults());
+                List<EventRuleDTO> eventRules = Lists.newArrayList();
+                paginationResult.getData()
+                    .forEach(eventRule -> {
+                        EventRuleDTO eventRuleDTO = EventRuleDTO.builder()
+                            .eventBusName(eventRule.getEventBusName())
+                            .eventRuleName(eventRule.getName())
+                            .description(eventRule.getDescription())
+                            .filterPattern(eventRule.getFilterPattern())
+                            .status(eventRule.getStatus())
+                            .gmtCreate(eventRule.getGmtCreate())
+                            .gmtModify(eventRule.getGmtModify())
+                            .build();
+                        eventRules.add(eventRuleDTO);
+                    });
+                return new ListRulesResponse(eventRules, 
paginationResult.getNextToken(), paginationResult.getTotal(),
+                    listRulesRequest.getMaxResults());
             });
-        return new ListRulesResponse(eventRules, 
paginationResult.getNextToken(), paginationResult.getTotal(),
-            listRulesRequest.getMaxResults());
     }
 
     @PostMapping(value = {"enableEventRule"})
-    public EnableRuleResponse enableRule(@RequestBody EnableRuleRequest 
enableRuleRequest) {
-        
eventRuleDomainService.enableEventRuleWithDependencies(accountAPI.getResourceOwnerAccountId(),
-            enableRuleRequest.getEventBusName(), 
enableRuleRequest.getEventRuleName());
-        return new EnableRuleResponse();
+    public Mono<EnableRuleResponse> enableRule(@RequestBody EnableRuleRequest 
enableRuleRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                
eventRuleDomainService.enableEventRuleWithDependencies(accountAPI.getResourceOwnerAccountId(ctx),
+                    enableRuleRequest.getEventBusName(), 
enableRuleRequest.getEventRuleName());
+                return new EnableRuleResponse();
+            });
     }
 
     @PostMapping(value = {"disableEventRule"})
-    public DisableRuleResponse disableRule(@RequestBody DisableRuleRequest 
disableRuleRequest) {
-        
eventRuleDomainService.disableEventRuleWithDependencies(accountAPI.getResourceOwnerAccountId(),
-            disableRuleRequest.getEventBusName(), 
disableRuleRequest.getEventRuleName());
-        return new DisableRuleResponse();
+    public Mono<DisableRuleResponse> disableRule(@RequestBody 
DisableRuleRequest disableRuleRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                
eventRuleDomainService.disableEventRuleWithDependencies(accountAPI.getResourceOwnerAccountId(ctx),
+                    disableRuleRequest.getEventBusName(), 
disableRuleRequest.getEventRuleName());
+                return new DisableRuleResponse();
+            });
     }
 
 }
diff --git 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventSourceController.java
 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventSourceController.java
index 7ad1e0a..d073be9 100644
--- 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventSourceController.java
+++ 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventSourceController.java
@@ -42,6 +42,7 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
 
 @RestController
 @RequestMapping("/source/")
@@ -54,69 +55,88 @@ public class EventSourceController {
     EventSourceServiceFactory eventSourceServiceFactory;
 
     @PostMapping(value = {"createEventSource"})
-    public CreateEventSourceResponse createEventSource(@RequestBody 
CreateEventSourceRequest createEventSourceRequest) {
-        EventSourceService eventSourceService = 
eventSourceServiceFactory.getEventSourceService(
-            EventSourceTypeEnum.USER_DEFINED.name(), 
createEventSourceRequest.getClassName());
-        
eventSourceService.createEventSource(accountAPI.getResourceOwnerAccountId(),
-            createEventSourceRequest.getEventBusName(), 
createEventSourceRequest.getEventSourceName(),
-            createEventSourceRequest.getDescription(), 
createEventSourceRequest.getClassName(),
-            createEventSourceRequest.getConfig());
-        return new 
CreateEventSourceResponse(createEventSourceRequest.getEventSourceName());
+    public Mono<CreateEventSourceResponse> createEventSource(
+        @RequestBody CreateEventSourceRequest createEventSourceRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                EventSourceService eventSourceService = 
eventSourceServiceFactory.getEventSourceService(
+                    EventSourceTypeEnum.USER_DEFINED.name(), 
createEventSourceRequest.getClassName());
+                
eventSourceService.createEventSource(accountAPI.getResourceOwnerAccountId(ctx),
+                    createEventSourceRequest.getEventBusName(), 
createEventSourceRequest.getEventSourceName(),
+                    createEventSourceRequest.getDescription(), 
createEventSourceRequest.getClassName(),
+                    createEventSourceRequest.getConfig());
+                return new 
CreateEventSourceResponse(createEventSourceRequest.getEventSourceName());
+            });
     }
 
     @PostMapping(value = {"updateEventSource"})
-    public UpdateEventSourceResponse updateEventSource(@RequestBody 
UpdateEventSourceRequest updateEventSourceRequest) {
-        EventSourceService eventSourceService = 
eventSourceServiceFactory.getEventSourceService(
-            accountAPI.getResourceOwnerAccountId(), 
updateEventSourceRequest.getEventBusName(),
-            updateEventSourceRequest.getEventSourceName());
-        
eventSourceService.updateEventSource(accountAPI.getResourceOwnerAccountId(),
-            updateEventSourceRequest.getEventBusName(), 
updateEventSourceRequest.getEventSourceName(),
-            updateEventSourceRequest.getDescription(), 
updateEventSourceRequest.getClassName(),
-            updateEventSourceRequest.getStatus(), 
updateEventSourceRequest.getConfig());
-        return new UpdateEventSourceResponse();
+    public Mono<UpdateEventSourceResponse> updateEventSource(
+        @RequestBody UpdateEventSourceRequest updateEventSourceRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                EventSourceService eventSourceService = 
eventSourceServiceFactory.getEventSourceService(
+                    accountAPI.getResourceOwnerAccountId(ctx), 
updateEventSourceRequest.getEventBusName(),
+                    updateEventSourceRequest.getEventSourceName());
+                
eventSourceService.updateEventSource(accountAPI.getResourceOwnerAccountId(ctx),
+                    updateEventSourceRequest.getEventBusName(), 
updateEventSourceRequest.getEventSourceName(),
+                    updateEventSourceRequest.getDescription(), 
updateEventSourceRequest.getClassName(),
+                    updateEventSourceRequest.getStatus(), 
updateEventSourceRequest.getConfig());
+                return new UpdateEventSourceResponse();
+            });
     }
 
     @PostMapping(value = {"deleteEventSource"})
-    public DeleteEventSourceResponse deleteEventSource(@RequestBody 
DeleteEventSourceRequest deleteEventSourceRequest) {
-        EventSourceService eventSourceService = 
eventSourceServiceFactory.getEventSourceService(
-            accountAPI.getResourceOwnerAccountId(), 
deleteEventSourceRequest.getEventBusName(),
-            deleteEventSourceRequest.getEventSourceName());
-        
eventSourceService.deleteEventSource(accountAPI.getResourceOwnerAccountId(),
-            deleteEventSourceRequest.getEventBusName(), 
deleteEventSourceRequest.getEventSourceName());
-        return new DeleteEventSourceResponse();
+    public Mono<DeleteEventSourceResponse> deleteEventSource(
+        @RequestBody DeleteEventSourceRequest deleteEventSourceRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                EventSourceService eventSourceService = 
eventSourceServiceFactory.getEventSourceService(
+                    accountAPI.getResourceOwnerAccountId(ctx), 
deleteEventSourceRequest.getEventBusName(),
+                    deleteEventSourceRequest.getEventSourceName());
+                
eventSourceService.deleteEventSource(accountAPI.getResourceOwnerAccountId(ctx),
+                    deleteEventSourceRequest.getEventBusName(), 
deleteEventSourceRequest.getEventSourceName());
+                return new DeleteEventSourceResponse();
+            });
     }
 
     @PostMapping(value = {"getEventSource"})
-    public GetEventSourceResponse getEventSource(@RequestBody 
GetEventSourceRequest getEventSourceRequest) {
-        EventSourceService eventSourceService = 
eventSourceServiceFactory.getEventSourceService(
-            accountAPI.getResourceOwnerAccountId(), 
getEventSourceRequest.getEventBusName(),
-            getEventSourceRequest.getEventSourceName());
-        EventSource eventSource = 
eventSourceService.getEventSource(accountAPI.getResourceOwnerAccountId(),
-            getEventSourceRequest.getEventBusName(), 
getEventSourceRequest.getEventSourceName());
-        return new GetEventSourceResponse(eventSource.getEventBusName(), 
eventSource.getName(),
-            eventSource.getDescription(), eventSource.getClassName(), 
eventSource.getConfig());
+    public Mono<GetEventSourceResponse> getEventSource(@RequestBody 
GetEventSourceRequest getEventSourceRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                EventSourceService eventSourceService = 
eventSourceServiceFactory.getEventSourceService(
+                    accountAPI.getResourceOwnerAccountId(ctx), 
getEventSourceRequest.getEventBusName(),
+                    getEventSourceRequest.getEventSourceName());
+                EventSource eventSource = 
eventSourceService.getEventSource(accountAPI.getResourceOwnerAccountId(ctx),
+                    getEventSourceRequest.getEventBusName(), 
getEventSourceRequest.getEventSourceName());
+                return new 
GetEventSourceResponse(eventSource.getEventBusName(), eventSource.getName(),
+                    eventSource.getDescription(), eventSource.getClassName(), 
eventSource.getConfig());
+            });
     }
 
     @PostMapping(value = {"listEventSources"})
-    public ListEventSourcesResponse listEventSources(@RequestBody 
ListEventSourcesRequest listEventSourcesRequest) {
-        EventSourceService eventSourceService = 
eventSourceServiceFactory.getDefaultEventSourceService();
-        PaginationResult<List<EventSource>> paginationResult = 
eventSourceService.listEventSources(
-            accountAPI.getResourceOwnerAccountId(), 
listEventSourcesRequest.getEventBusName(),
-            listEventSourcesRequest.getNextToken(), 
listEventSourcesRequest.getMaxResults());
-        List<EventSourceDTO> eventSourceDTOS = Lists.newArrayList();
-        paginationResult.getData()
-            .forEach(eventSource -> {
-                EventSourceDTO eventSourceDTO = EventSourceDTO.builder()
-                    .eventBusName(eventSource.getEventBusName())
-                    .eventSourceName(eventSource.getName())
-                    .description(eventSource.getDescription())
-                    .gmtCreate(eventSource.getGmtCreate())
-                    .gmtModify(eventSource.getGmtModify())
-                    .build();
-                eventSourceDTOS.add(eventSourceDTO);
+    public Mono<ListEventSourcesResponse> listEventSources(
+        @RequestBody ListEventSourcesRequest listEventSourcesRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                EventSourceService eventSourceService = 
eventSourceServiceFactory.getDefaultEventSourceService();
+                PaginationResult<List<EventSource>> paginationResult = 
eventSourceService.listEventSources(
+                    accountAPI.getResourceOwnerAccountId(ctx), 
listEventSourcesRequest.getEventBusName(),
+                    listEventSourcesRequest.getNextToken(), 
listEventSourcesRequest.getMaxResults());
+                List<EventSourceDTO> eventSourceDTOS = Lists.newArrayList();
+                paginationResult.getData()
+                    .forEach(eventSource -> {
+                        EventSourceDTO eventSourceDTO = 
EventSourceDTO.builder()
+                            .eventBusName(eventSource.getEventBusName())
+                            .eventSourceName(eventSource.getName())
+                            .description(eventSource.getDescription())
+                            .gmtCreate(eventSource.getGmtCreate())
+                            .gmtModify(eventSource.getGmtModify())
+                            .build();
+                        eventSourceDTOS.add(eventSourceDTO);
+                    });
+                return new ListEventSourcesResponse(eventSourceDTOS, 
paginationResult.getNextToken(),
+                    paginationResult.getTotal(), 
listEventSourcesRequest.getMaxResults());
             });
-        return new ListEventSourcesResponse(eventSourceDTOS, 
paginationResult.getNextToken(),
-            paginationResult.getTotal(), 
listEventSourcesRequest.getMaxResults());
     }
 
 }
diff --git 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventTargetController.java
 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventTargetController.java
index d8607a8..313d99f 100644
--- 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventTargetController.java
+++ 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventTargetController.java
@@ -39,6 +39,7 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
 
 @RestController
 @RequestMapping("/target/")
@@ -51,44 +52,57 @@ public class EventTargetController {
     AccountAPI accountAPI;
 
     @PostMapping(value = {"createEventTargets"})
-    public CreateTargetsResponse createTargets(@RequestBody 
CreateTargetsRequest createTargetsRequest) {
-        List<EventTarget> eventTargetList = 
EventTargetConverter.convertEventTargets(
-            accountAPI.getResourceOwnerAccountId(), 
createTargetsRequest.getEventBusName(),
-            createTargetsRequest.getEventRuleName(), 
createTargetsRequest.getEventTargets());
-        
eventTargetService.createTargets(accountAPI.getResourceOwnerAccountId(), 
createTargetsRequest.getEventBusName(),
-            createTargetsRequest.getEventRuleName(), eventTargetList);
-        return new CreateTargetsResponse();
+    public Mono<CreateTargetsResponse> createTargets(@RequestBody 
CreateTargetsRequest createTargetsRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                List<EventTarget> eventTargetList = 
EventTargetConverter.convertEventTargets(
+                    accountAPI.getResourceOwnerAccountId(ctx), 
createTargetsRequest.getEventBusName(),
+                    createTargetsRequest.getEventRuleName(), 
createTargetsRequest.getEventTargets());
+                
eventTargetService.createTargets(accountAPI.getResourceOwnerAccountId(ctx),
+                    createTargetsRequest.getEventBusName(), 
createTargetsRequest.getEventRuleName(), eventTargetList);
+                return new CreateTargetsResponse();
+            });
     }
 
     @PostMapping(value = {"updateEventTargets"})
-    public UpdateTargetsResponse updateTargets(@RequestBody 
UpdateTargetsRequest updateTargetsRequest) {
-        List<EventTarget> eventTargetList = 
EventTargetConverter.convertEventTargets(
-            accountAPI.getResourceOwnerAccountId(), 
updateTargetsRequest.getEventBusName(),
-            updateTargetsRequest.getEventRuleName(), 
updateTargetsRequest.getEventTargets());
-        
eventTargetService.updateTargets(accountAPI.getResourceOwnerAccountId(), 
updateTargetsRequest.getEventBusName(),
-            updateTargetsRequest.getEventRuleName(), eventTargetList);
-        return new UpdateTargetsResponse();
+    public Mono<UpdateTargetsResponse> updateTargets(@RequestBody 
UpdateTargetsRequest updateTargetsRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                List<EventTarget> eventTargetList = 
EventTargetConverter.convertEventTargets(
+                    accountAPI.getResourceOwnerAccountId(ctx), 
updateTargetsRequest.getEventBusName(),
+                    updateTargetsRequest.getEventRuleName(), 
updateTargetsRequest.getEventTargets());
+                
eventTargetService.updateTargets(accountAPI.getResourceOwnerAccountId(ctx),
+                    updateTargetsRequest.getEventBusName(), 
updateTargetsRequest.getEventRuleName(), eventTargetList);
+                return new UpdateTargetsResponse();
+            });
     }
 
     @PostMapping(value = {"deleteEventTargets"})
-    public DeleteTargetsResponse deleteTargets(@RequestBody 
DeleteTargetsRequest deleteTargetsRequest) {
-        
eventTargetService.deleteTargets(accountAPI.getResourceOwnerAccountId(), 
deleteTargetsRequest.getEventBusName(),
-            deleteTargetsRequest.getEventRuleName(), 
deleteTargetsRequest.getEventTargetNames());
-        return new DeleteTargetsResponse();
+    public Mono<DeleteTargetsResponse> deleteTargets(@RequestBody 
DeleteTargetsRequest deleteTargetsRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                
eventTargetService.deleteTargets(accountAPI.getResourceOwnerAccountId(ctx),
+                    deleteTargetsRequest.getEventBusName(), 
deleteTargetsRequest.getEventRuleName(),
+                    deleteTargetsRequest.getEventTargetNames());
+                return new DeleteTargetsResponse();
+            });
     }
 
     @PostMapping(value = {"listEventTargets"})
-    public ListTargetsResponse listEventTargets(@RequestBody 
ListTargetsRequest listTargetsRequest) {
-        List<EventTarget> eventTargetRunnerList = 
eventTargetService.listTargets(accountAPI.getResourceOwnerAccountId(),
-            listTargetsRequest.getEventBusName(), 
listTargetsRequest.getEventRuleName());
-        List<EventTargetDTO> eventTargets = eventTargetRunnerList.stream()
-            .map(entry -> EventTargetDTOConverter.convert(entry))
-            .collect(Collectors.toList());
-        ListTargetsResponse listTargetsResponse = new ListTargetsResponse();
-        
listTargetsResponse.setEventBusName(listTargetsRequest.getEventBusName());
-        
listTargetsResponse.setEventRuleName(listTargetsRequest.getEventRuleName());
-        listTargetsResponse.setEventTargets(eventTargets);
-        return listTargetsResponse;
+    public Mono<ListTargetsResponse> listEventTargets(@RequestBody 
ListTargetsRequest listTargetsRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                List<EventTarget> eventTargetRunnerList = 
eventTargetService.listTargets(
+                    accountAPI.getResourceOwnerAccountId(ctx), 
listTargetsRequest.getEventBusName(),
+                    listTargetsRequest.getEventRuleName());
+                List<EventTargetDTO> eventTargets = 
eventTargetRunnerList.stream()
+                    .map(entry -> EventTargetDTOConverter.convert(entry))
+                    .collect(Collectors.toList());
+                ListTargetsResponse listTargetsResponse = new 
ListTargetsResponse();
+                
listTargetsResponse.setEventBusName(listTargetsRequest.getEventBusName());
+                
listTargetsResponse.setEventRuleName(listTargetsRequest.getEventRuleName());
+                listTargetsResponse.setEventTargets(eventTargets);
+                return listTargetsResponse;
+            });
     }
-
 }
diff --git 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventTypeController.java
 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventTypeController.java
index 4ac545e..b44a4e3 100644
--- 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventTypeController.java
+++ 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventTypeController.java
@@ -32,6 +32,7 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
 
 @RestController
 @RequestMapping("/type/")
@@ -44,27 +45,30 @@ public class EventTypeController {
     EventTypeService eventTypeService;
 
     @PostMapping(value = {"listEventTypes"})
-    public ListEventTypesResponse listEventTypes(@RequestBody 
ListEventTypesRequest listEventTypesRequest) {
-        PaginationResult<List<EventType>> paginationResult = 
eventTypeService.listEventTypes(
-            accountAPI.getResourceOwnerAccountId(), 
listEventTypesRequest.getEventBusName(),
-            listEventTypesRequest.getEventSourceName(), 
listEventTypesRequest.getNextToken(),
-            listEventTypesRequest.getMaxResults());
+    public Mono<ListEventTypesResponse> listEventTypes(@RequestBody 
ListEventTypesRequest listEventTypesRequest) {
+        return Mono.subscriberContext()
+            .map(ctx -> {
+                PaginationResult<List<EventType>> paginationResult = 
eventTypeService.listEventTypes(
+                    accountAPI.getResourceOwnerAccountId(ctx), 
listEventTypesRequest.getEventBusName(),
+                    listEventTypesRequest.getEventSourceName(), 
listEventTypesRequest.getNextToken(),
+                    listEventTypesRequest.getMaxResults());
 
-        List<EventTypeDTO> eventTypeDTOS = Lists.newArrayList();
-        paginationResult.getData()
-            .forEach(eventType -> {
-                EventTypeDTO eventTypeDTO = EventTypeDTO.builder()
-                    .eventBusName(eventType.getEventBusName())
-                    .eventSourceName(eventType.getEventSourceName())
-                    .eventTypeName(eventType.getName())
-                    .description(eventType.getDescription())
-                    .gmtCreate(eventType.getGmtCreate())
-                    .gmtModify(eventType.getGmtModify())
-                    .build();
-                eventTypeDTOS.add(eventTypeDTO);
+                List<EventTypeDTO> eventTypeDTOS = Lists.newArrayList();
+                paginationResult.getData()
+                    .forEach(eventType -> {
+                        EventTypeDTO eventTypeDTO = EventTypeDTO.builder()
+                            .eventBusName(eventType.getEventBusName())
+                            .eventSourceName(eventType.getEventSourceName())
+                            .eventTypeName(eventType.getName())
+                            .description(eventType.getDescription())
+                            .gmtCreate(eventType.getGmtCreate())
+                            .gmtModify(eventType.getGmtModify())
+                            .build();
+                        eventTypeDTOS.add(eventTypeDTO);
+                    });
+                return new ListEventTypesResponse(eventTypeDTOS, 
paginationResult.getNextToken(),
+                    paginationResult.getTotal(), 
listEventTypesRequest.getMaxResults());
             });
-        return new ListEventTypesResponse(eventTypeDTOS, 
paginationResult.getNextToken(), paginationResult.getTotal(),
-            listEventTypesRequest.getMaxResults());
     }
 
 }
diff --git 
a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java
 
b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java
index 4ea9405..e15fccc 100644
--- 
a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java
+++ 
b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ApiDestinationDTOControllerTest.java
@@ -42,6 +42,7 @@ import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
+import reactor.core.publisher.Mono;
 
 import javax.validation.ConstraintViolation;
 import javax.validation.Validator;
@@ -67,83 +68,118 @@ public class ApiDestinationDTOControllerTest {
 
     @Before
     public void testBefore() {
-        
Mockito.when(accountAPI.getResourceOwnerAccountId()).thenReturn(UUID.randomUUID().toString());
+        Mockito.when(accountAPI.getResourceOwnerAccountId(any()))
+            .thenReturn(UUID.randomUUID()
+                .toString());
     }
 
     @Test
     public void testCreateApiDestination() {
-        
Mockito.when(apiDestinationService.createApiDestination(any())).thenReturn(UUID.randomUUID().toString());
+        Mockito.when(apiDestinationService.createApiDestination(any()))
+            .thenReturn(UUID.randomUUID()
+                .toString());
         CreateApiDestinationRequest createApiDestinationRequest = new 
CreateApiDestinationRequest();
-        
createApiDestinationRequest.setApiDestinationName(UUID.randomUUID().toString());
-        
createApiDestinationRequest.setDescription(UUID.randomUUID().toString());
+        createApiDestinationRequest.setApiDestinationName(UUID.randomUUID()
+            .toString());
+        createApiDestinationRequest.setDescription(UUID.randomUUID()
+            .toString());
         HttpApiParameters httpApiParameters = new HttpApiParameters();
-        httpApiParameters.setEndpoint(UUID.randomUUID().toString());
-        httpApiParameters.setMethod(UUID.randomUUID().toString());
+        httpApiParameters.setEndpoint(UUID.randomUUID()
+            .toString());
+        httpApiParameters.setMethod(UUID.randomUUID()
+            .toString());
         createApiDestinationRequest.setHttpApiParameters(httpApiParameters);
         createApiDestinationRequest.setInvocationRateLimitPerSecond(11);
-        final CreateApiDestinationResponse apiDestination = 
apiDestinationController.createApiDestination(createApiDestinationRequest);
-        Assert.assertEquals(apiDestination.getCode(), 
EventBridgeErrorCode.Success.getCode());
+        final Mono<CreateApiDestinationResponse> apiDestination = 
apiDestinationController.createApiDestination(
+            createApiDestinationRequest);
+        Assert.assertEquals(apiDestination.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
     }
 
     @Test
     public void testUpdateApiDestination() {
         Set<ConstraintViolation<UpdateApiDestinationRequest>> 
constraintViolations = new HashSet<>();
-        
Mockito.when(validator.validate(any(UpdateApiDestinationRequest.class))).thenReturn(constraintViolations);
-        
Mockito.when(apiDestinationService.updateApiDestination(any())).thenReturn(Boolean.TRUE);
+        
Mockito.when(validator.validate(any(UpdateApiDestinationRequest.class)))
+            .thenReturn(constraintViolations);
+        Mockito.when(apiDestinationService.updateApiDestination(any()))
+            .thenReturn(Boolean.TRUE);
         UpdateApiDestinationRequest updateApiDestinationRequest = new 
UpdateApiDestinationRequest();
-        
updateApiDestinationRequest.setApiDestinationName(UUID.randomUUID().toString());
-        
updateApiDestinationRequest.setDescription(UUID.randomUUID().toString());
+        updateApiDestinationRequest.setApiDestinationName(UUID.randomUUID()
+            .toString());
+        updateApiDestinationRequest.setDescription(UUID.randomUUID()
+            .toString());
         HttpApiParameters httpApiParameters = new HttpApiParameters();
-        httpApiParameters.setEndpoint(UUID.randomUUID().toString());
-        httpApiParameters.setMethod(UUID.randomUUID().toString());
+        httpApiParameters.setEndpoint(UUID.randomUUID()
+            .toString());
+        httpApiParameters.setMethod(UUID.randomUUID()
+            .toString());
         updateApiDestinationRequest.setHttpApiParameters(httpApiParameters);
         updateApiDestinationRequest.setInvocationRateLimitPerSecond(11);
-        final UpdateApiDestinationResponse updateApiDestinationResponse = 
apiDestinationController.updateApiDestination(updateApiDestinationRequest);
-        Assert.assertEquals(updateApiDestinationResponse.getCode(), 
EventBridgeErrorCode.Success.getCode());
+        final Mono<UpdateApiDestinationResponse> updateApiDestinationResponse
+            = 
apiDestinationController.updateApiDestination(updateApiDestinationRequest);
+        Assert.assertEquals(updateApiDestinationResponse.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
     }
 
     @Test
     public void testGetApiDestination() {
         Set<ConstraintViolation<GetApiDestinationRequest>> 
constraintViolations = new HashSet<>();
-        
Mockito.when(validator.validate(any(GetApiDestinationRequest.class))).thenReturn(constraintViolations);
+        Mockito.when(validator.validate(any(GetApiDestinationRequest.class)))
+            .thenReturn(constraintViolations);
         ApiDestinationDTO eventApiDestinationDTO = new ApiDestinationDTO();
-        eventApiDestinationDTO.setName(UUID.randomUUID().toString());
-        Mockito.when(apiDestinationService.getApiDestination(any(), 
any())).thenReturn(eventApiDestinationDTO);
+        eventApiDestinationDTO.setName(UUID.randomUUID()
+            .toString());
+        Mockito.when(apiDestinationService.getApiDestination(any(), any()))
+            .thenReturn(eventApiDestinationDTO);
         GetApiDestinationRequest getApiDestinationRequest = new 
GetApiDestinationRequest();
-        
getApiDestinationRequest.setApiDestinationName(UUID.randomUUID().toString());
-        final GetApiDestinationResponse apiDestination = 
apiDestinationController.getApiDestination(getApiDestinationRequest);
-        Assert.assertEquals(apiDestination.getCode(), 
EventBridgeErrorCode.Success.getCode());
+        getApiDestinationRequest.setApiDestinationName(UUID.randomUUID()
+            .toString());
+        final Mono<GetApiDestinationResponse> apiDestination = 
apiDestinationController.getApiDestination(
+            getApiDestinationRequest);
+        Assert.assertEquals(apiDestination.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
     }
 
     @Test
     public void testDeleteApiDestination() {
         Set<ConstraintViolation<DeleteApiDestinationRequest>> 
constraintViolations = new HashSet<>();
-        
Mockito.when(validator.validate(any(DeleteApiDestinationRequest.class))).thenReturn(constraintViolations);
-        Mockito.when(apiDestinationService.deleteApiDestination(any(), 
any())).thenReturn(Boolean.TRUE);
+        
Mockito.when(validator.validate(any(DeleteApiDestinationRequest.class)))
+            .thenReturn(constraintViolations);
+        Mockito.when(apiDestinationService.deleteApiDestination(any(), any()))
+            .thenReturn(Boolean.TRUE);
         DeleteApiDestinationRequest deleteApiDestinationRequest = new 
DeleteApiDestinationRequest();
-        
deleteApiDestinationRequest.setApiDestinationName(UUID.randomUUID().toString());
-        final DeleteApiDestinationResponse deleteApiDestinationResponse = 
apiDestinationController.deleteApiDestination(deleteApiDestinationRequest);
-        Assert.assertEquals(deleteApiDestinationResponse.getCode(), 
EventBridgeErrorCode.Success.getCode());
+        deleteApiDestinationRequest.setApiDestinationName(UUID.randomUUID()
+            .toString());
+        final Mono<DeleteApiDestinationResponse> deleteApiDestinationResponse
+            = 
apiDestinationController.deleteApiDestination(deleteApiDestinationRequest);
+        Assert.assertEquals(deleteApiDestinationResponse.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
     }
 
     @Test
     public void testListApiDestinations() {
         Set<ConstraintViolation<ListApiDestinationsRequest>> 
constraintViolations = new HashSet<>();
-        
Mockito.when(validator.validate(any(ListApiDestinationsRequest.class))).thenReturn(constraintViolations);
+        Mockito.when(validator.validate(any(ListApiDestinationsRequest.class)))
+            .thenReturn(constraintViolations);
         PaginationResult<List<ApiDestinationDTO>> result = new 
PaginationResult();
         List<ApiDestinationDTO> apiDestinationDTOList = Lists.newArrayList();
         ApiDestinationDTO apiDestinationDTO = new ApiDestinationDTO();
-        apiDestinationDTO.setName(UUID.randomUUID().toString());
+        apiDestinationDTO.setName(UUID.randomUUID()
+            .toString());
         apiDestinationDTOList.add(apiDestinationDTO);
         result.setData(apiDestinationDTOList);
         result.setTotal(9);
         result.setNextToken("0");
-        Mockito.when(apiDestinationService.listApiDestinations(any(), any(), 
any(), anyInt())).thenReturn(result);
+        Mockito.when(apiDestinationService.listApiDestinations(any(), any(), 
any(), anyInt()))
+            .thenReturn(result);
         ListApiDestinationsRequest listApiDestinationsRequest = new 
ListApiDestinationsRequest();
-        
listApiDestinationsRequest.setApiDestinationNamePrefix(UUID.randomUUID().toString());
+        
listApiDestinationsRequest.setApiDestinationNamePrefix(UUID.randomUUID()
+            .toString());
         listApiDestinationsRequest.setNextToken("0");
         listApiDestinationsRequest.setMaxResults(10);
-        final ListApiDestinationsResponse listApiDestinationsResponse = 
apiDestinationController.listApiDestinations(listApiDestinationsRequest);
-        Assert.assertEquals(listApiDestinationsResponse.getCode(), 
EventBridgeErrorCode.Success.getCode());
+        final Mono<ListApiDestinationsResponse> listApiDestinationsResponse
+            = 
apiDestinationController.listApiDestinations(listApiDestinationsRequest);
+        Assert.assertEquals(listApiDestinationsResponse.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
     }
 }
diff --git 
a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java
 
b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java
index b21e8ef..e765868 100644
--- 
a/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java
+++ 
b/adapter/api/src/test/java/org/apache/rocketmq/eventbridge/adapter/api/controller/ConnectionControllerTest.java
@@ -48,6 +48,7 @@ import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
+import reactor.core.publisher.Mono;
 
 import javax.validation.ConstraintViolation;
 import javax.validation.Validator;
@@ -74,93 +75,135 @@ public class ConnectionControllerTest {
 
     @Before
     public void testBefore() throws Exception {
-        
Mockito.when(accountAPI.getResourceOwnerAccountId()).thenReturn(UUID.randomUUID().toString());
+        Mockito.when(accountAPI.getResourceOwnerAccountId(any()))
+            .thenReturn(UUID.randomUUID()
+                .toString());
     }
 
     @Test
     public void testCreateConnection() {
-        
Mockito.when(connectionService.createConnection(any(ConnectionDTO.class))).thenReturn(UUID.randomUUID().toString());
+        
Mockito.when(connectionService.createConnection(any(ConnectionDTO.class)))
+            .thenReturn(UUID.randomUUID()
+                .toString());
         Set<ConstraintViolation<CreateConnectionRequest>> constraintViolations 
= new HashSet<>();
-        
Mockito.when(validator.validate(any(CreateConnectionRequest.class))).thenReturn(constraintViolations);
+        Mockito.when(validator.validate(any(CreateConnectionRequest.class)))
+            .thenReturn(constraintViolations);
         CreateConnectionRequest createConnectionRequest = new 
CreateConnectionRequest();
-        
createConnectionRequest.setConnectionName(UUID.randomUUID().toString());
-        createConnectionRequest.setDescription(UUID.randomUUID().toString());
+        createConnectionRequest.setConnectionName(UUID.randomUUID()
+            .toString());
+        createConnectionRequest.setDescription(UUID.randomUUID()
+            .toString());
         NetworkParameters networkParameters = new NetworkParameters();
         
networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType());
-        networkParameters.setSecurityGroupId(UUID.randomUUID().toString());
-        networkParameters.setVpcId(UUID.randomUUID().toString());
-        networkParameters.setVswitcheId(UUID.randomUUID().toString());
+        networkParameters.setSecurityGroupId(UUID.randomUUID()
+            .toString());
+        networkParameters.setVpcId(UUID.randomUUID()
+            .toString());
+        networkParameters.setVswitcheId(UUID.randomUUID()
+            .toString());
         createConnectionRequest.setNetworkParameters(networkParameters);
         AuthParameters authParameters = new AuthParameters();
         BasicAuthParameters basicAuthParameters = new BasicAuthParameters();
-        basicAuthParameters.setPassword(UUID.randomUUID().toString());
-        basicAuthParameters.setUsername(UUID.randomUUID().toString());
+        basicAuthParameters.setPassword(UUID.randomUUID()
+            .toString());
+        basicAuthParameters.setUsername(UUID.randomUUID()
+            .toString());
         authParameters.setBasicAuthParameters(basicAuthParameters);
         
authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType());
         createConnectionRequest.setAuthParameters(authParameters);
-        final CreateConnectionResponse connection = 
connectionController.createConnection(createConnectionRequest);
-        Assert.assertEquals(connection.getCode(), 
EventBridgeErrorCode.Success.getCode());
+        final Mono<CreateConnectionResponse> connection = 
connectionController.createConnection(
+            createConnectionRequest);
+        Assert.assertEquals(connection.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
     }
 
     @Test
     public void testDeleteConnection() {
-        
Mockito.doNothing().when(connectionService).deleteConnection(anyString(), 
anyString());
+        Mockito.doNothing()
+            .when(connectionService)
+            .deleteConnection(anyString(), anyString());
         Set<ConstraintViolation<DeleteConnectionRequest>> constraintViolations 
= new HashSet<>();
-        
Mockito.when(validator.validate(any(DeleteConnectionRequest.class))).thenReturn(constraintViolations);
+        Mockito.when(validator.validate(any(DeleteConnectionRequest.class)))
+            .thenReturn(constraintViolations);
         DeleteConnectionRequest deleteConnectionRequest = new 
DeleteConnectionRequest();
-        
deleteConnectionRequest.setConnectionName(UUID.randomUUID().toString());
-        final DeleteConnectionResponse deleteConnectionResponse = 
connectionController.deleteConnection(deleteConnectionRequest);
-        Assert.assertEquals(deleteConnectionResponse.getCode(), 
EventBridgeErrorCode.Success.getCode());
+        deleteConnectionRequest.setConnectionName(UUID.randomUUID()
+            .toString());
+        final Mono<DeleteConnectionResponse> deleteConnectionResponse = 
connectionController.deleteConnection(
+            deleteConnectionRequest);
+        Assert.assertEquals(deleteConnectionResponse.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
     }
 
     @Test
     public void testUpdateConnection() {
-        
Mockito.doNothing().when(connectionService).updateConnection(any(ConnectionDTO.class),
 anyString());
+        Mockito.doNothing()
+            .when(connectionService)
+            .updateConnection(any(ConnectionDTO.class), anyString());
         Set<ConstraintViolation<UpdateConnectionRequest>> constraintViolations 
= new HashSet<>();
-        
Mockito.when(validator.validate(any(UpdateConnectionRequest.class))).thenReturn(constraintViolations);
+        Mockito.when(validator.validate(any(UpdateConnectionRequest.class)))
+            .thenReturn(constraintViolations);
         UpdateConnectionRequest updateConnectionRequest = new 
UpdateConnectionRequest();
-        
updateConnectionRequest.setConnectionName(UUID.randomUUID().toString());
-        updateConnectionRequest.setDescription(UUID.randomUUID().toString());
+        updateConnectionRequest.setConnectionName(UUID.randomUUID()
+            .toString());
+        updateConnectionRequest.setDescription(UUID.randomUUID()
+            .toString());
         NetworkParameters networkParameters = new NetworkParameters();
         
networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType());
-        networkParameters.setSecurityGroupId(UUID.randomUUID().toString());
-        networkParameters.setVpcId(UUID.randomUUID().toString());
-        networkParameters.setVswitcheId(UUID.randomUUID().toString());
+        networkParameters.setSecurityGroupId(UUID.randomUUID()
+            .toString());
+        networkParameters.setVpcId(UUID.randomUUID()
+            .toString());
+        networkParameters.setVswitcheId(UUID.randomUUID()
+            .toString());
         updateConnectionRequest.setNetworkParameters(networkParameters);
         AuthParameters authParameters = new AuthParameters();
         BasicAuthParameters basicAuthParameters = new BasicAuthParameters();
-        basicAuthParameters.setPassword(UUID.randomUUID().toString());
-        basicAuthParameters.setUsername(UUID.randomUUID().toString());
+        basicAuthParameters.setPassword(UUID.randomUUID()
+            .toString());
+        basicAuthParameters.setUsername(UUID.randomUUID()
+            .toString());
         authParameters.setBasicAuthParameters(basicAuthParameters);
         
authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType());
         updateConnectionRequest.setAuthParameters(authParameters);
-        final UpdateConnectionResponse updateConnectionResponse = 
connectionController.updateConnection(updateConnectionRequest);
-        Assert.assertEquals(updateConnectionResponse.getCode(), 
EventBridgeErrorCode.Success.getCode());
+        final Mono<UpdateConnectionResponse> updateConnectionResponse = 
connectionController.updateConnection(
+            updateConnectionRequest);
+        Assert.assertEquals(updateConnectionResponse.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
     }
 
     @Test
     public void testGetConnection() {
         Set<ConstraintViolation<GetConnectionRequest>> constraintViolations = 
new HashSet<>();
-        
Mockito.when(validator.validate(any(GetConnectionRequest.class))).thenReturn(constraintViolations);
+        Mockito.when(validator.validate(any(GetConnectionRequest.class)))
+            .thenReturn(constraintViolations);
         final ConnectionDTO connectionDTO = new ConnectionDTO();
         NetworkParameters networkParameters = new NetworkParameters();
         
networkParameters.setNetworkType(NetworkTypeEnum.PUBLIC_NETWORK.getNetworkType());
-        networkParameters.setSecurityGroupId(UUID.randomUUID().toString());
-        networkParameters.setVpcId(UUID.randomUUID().toString());
-        networkParameters.setVswitcheId(UUID.randomUUID().toString());
+        networkParameters.setSecurityGroupId(UUID.randomUUID()
+            .toString());
+        networkParameters.setVpcId(UUID.randomUUID()
+            .toString());
+        networkParameters.setVswitcheId(UUID.randomUUID()
+            .toString());
         connectionDTO.setNetworkParameters(networkParameters);
         AuthParameters authParameters = new AuthParameters();
         BasicAuthParameters basicAuthParameters = new BasicAuthParameters();
-        basicAuthParameters.setPassword(UUID.randomUUID().toString());
-        basicAuthParameters.setUsername(UUID.randomUUID().toString());
+        basicAuthParameters.setPassword(UUID.randomUUID()
+            .toString());
+        basicAuthParameters.setUsername(UUID.randomUUID()
+            .toString());
         authParameters.setBasicAuthParameters(basicAuthParameters);
         
authParameters.setAuthorizationType(AuthorizationTypeEnum.BASIC_AUTH.getType());
         connectionDTO.setAuthParameters(authParameters);
-        BDDMockito.given(connectionService.getConnection(any(), 
any())).willReturn(connectionDTO);
+        BDDMockito.given(connectionService.getConnection(any(), any()))
+            .willReturn(connectionDTO);
         GetConnectionRequest getConnectionRequest = new GetConnectionRequest();
-        getConnectionRequest.setConnectionName(UUID.randomUUID().toString());
-        final GetConnectionResponse getConnectionResponse = 
connectionController.getConnection(getConnectionRequest);
-        Assert.assertEquals(getConnectionResponse.getCode(), 
EventBridgeErrorCode.Success.getCode());
+        getConnectionRequest.setConnectionName(UUID.randomUUID()
+            .toString());
+        final Mono<GetConnectionResponse> getConnectionResponse = 
connectionController.getConnection(
+            getConnectionRequest);
+        Assert.assertEquals(getConnectionResponse.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
     }
 
     @Test
@@ -168,25 +211,33 @@ public class ConnectionControllerTest {
         PaginationResult<List<ConnectionDTO>> result = new PaginationResult();
         List<ConnectionDTO> eventConnectionWithBLOBs = Lists.newArrayList();
         ConnectionDTO eventConnection = new ConnectionDTO();
-        eventConnection.setConnectionName(UUID.randomUUID().toString());
+        eventConnection.setConnectionName(UUID.randomUUID()
+            .toString());
         eventConnectionWithBLOBs.add(eventConnection);
         result.setData(eventConnectionWithBLOBs);
         result.setTotal(9);
         result.setNextToken("0");
-        Mockito.when(connectionService.listConnections(any(), any(), any(), 
anyInt())).thenReturn(result);
+        Mockito.when(connectionService.listConnections(any(), any(), any(), 
anyInt()))
+            .thenReturn(result);
         Set<ConstraintViolation<ListConnectionRequest>> constraintViolations = 
new HashSet<>();
-        
Mockito.when(validator.validate(any(ListConnectionRequest.class))).thenReturn(constraintViolations);
+        Mockito.when(validator.validate(any(ListConnectionRequest.class)))
+            .thenReturn(constraintViolations);
         ListConnectionRequest listConnectionRequest = new 
ListConnectionRequest();
-        
listConnectionRequest.setConnectionNamePrefix(UUID.randomUUID().toString());
+        listConnectionRequest.setConnectionNamePrefix(UUID.randomUUID()
+            .toString());
         listConnectionRequest.setNextToken("0");
         listConnectionRequest.setMaxResults(10);
-        final ListConnectionResponse listConnectionResponse = 
connectionController.listConnections(listConnectionRequest);
-        Assert.assertEquals(listConnectionResponse.getCode(), 
EventBridgeErrorCode.Success.getCode());
+        final Mono<ListConnectionResponse> listConnections = 
connectionController.listConnections(
+            listConnectionRequest);
+        Assert.assertEquals(listConnections.block()
+            .getCode(), EventBridgeErrorCode.Success.getCode());
     }
 
     @Test
     public void testListEnumsResponse() {
-        final ListEnumsResponse listEnumsResponse = 
connectionController.listEnumsResponse();
-        Assert.assertEquals(listEnumsResponse.getNetworkTypeEnums().size(), 
NetworkTypeEnum.values().length);
+        final Mono<ListEnumsResponse> listEnumsResponse = 
connectionController.listEnumsResponse();
+        Assert.assertEquals(listEnumsResponse.block()
+            .getNetworkTypeEnums()
+            .size(), NetworkTypeEnum.values().length);
     }
 }
diff --git 
a/adapter/persistence/src/main/resources/db/migration/V3__register_source_acs_mns.sql
 
b/adapter/persistence/src/main/resources/db/migration/V4__register_source_acs_mns.sql
similarity index 100%
rename from 
adapter/persistence/src/main/resources/db/migration/V3__register_source_acs_mns.sql
rename to 
adapter/persistence/src/main/resources/db/migration/V4__register_source_acs_mns.sql
diff --git 
a/adapter/persistence/src/main/resources/db/migration/V4__register_target_acs_eventbridge.sql
 
b/adapter/persistence/src/main/resources/db/migration/V6__register_target_acs_eventbridge.sql
similarity index 100%
rename from 
adapter/persistence/src/main/resources/db/migration/V4__register_target_acs_eventbridge.sql
rename to 
adapter/persistence/src/main/resources/db/migration/V6__register_target_acs_eventbridge.sql
diff --git a/adapter/rpc/pom.xml b/adapter/rpc/pom.xml
index 25390fe..e717ede 100644
--- a/adapter/rpc/pom.xml
+++ b/adapter/rpc/pom.xml
@@ -14,6 +14,7 @@
     <properties>
         <rocketmq.version>4.9.2</rocketmq.version>
         <httpclient.version>4.5.13</httpclient.version>
+        <reactor.version>3.1.7.RELEASE</reactor.version>
     </properties>
 
     <version>1.0.0-SNAPSHOT</version>
@@ -47,5 +48,10 @@
             <artifactId>httpclient</artifactId>
             <version>${httpclient.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+            <version>${reactor.version}</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/AccountAPIImpl.java
 
b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/AccountAPIImpl.java
index b4a28e6..a8c87fa 100644
--- 
a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/AccountAPIImpl.java
+++ 
b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/AccountAPIImpl.java
@@ -19,30 +19,26 @@ package org.apache.rocketmq.eventbridge.adapter.rpc.impl;
 
 import org.apache.rocketmq.eventbridge.domain.rpc.AccountAPI;
 import org.springframework.stereotype.Component;
+import reactor.util.context.Context;
 
 @Component
-public class AccountAPIImpl implements AccountAPI {
+public class AccountAPIImpl implements AccountAPI<Context> {
     public static final String HEADER_KEY_LOGIN_ACCOUNT_ID = "loginAccountId";
     public static final String HEADER_KEY_PARENT_ACCOUNT_ID = 
"parentAccountId";
     public static final String HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID = 
"resourceOwnerAccountId";
 
     @Override
-    public String getLoginAccountId() {
-        return null;
+    public String getLoginAccountId(Context ctx) {
+        return ctx.get(HEADER_KEY_LOGIN_ACCOUNT_ID);
     }
 
     @Override
-    public String getParentAccountId() {
-        return null;
+    public String getParentAccountId(Context ctx) {
+        return ctx.get(HEADER_KEY_PARENT_ACCOUNT_ID);
     }
 
     @Override
-    public String getResourceOwnerAccountId() {
-        return "Admin";
-    }
-
-    @Override
-    public boolean isParentAccount() {
-        return false;
+    public String getResourceOwnerAccountId(Context ctx) {
+        return ctx.get(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID);
     }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/eventbridge/exception/code/DefaultErrorCode.java
 
b/common/src/main/java/org/apache/rocketmq/eventbridge/exception/code/DefaultErrorCode.java
index 5d66444..7f8472e 100644
--- 
a/common/src/main/java/org/apache/rocketmq/eventbridge/exception/code/DefaultErrorCode.java
+++ 
b/common/src/main/java/org/apache/rocketmq/eventbridge/exception/code/DefaultErrorCode.java
@@ -21,6 +21,7 @@ public enum DefaultErrorCode implements BaseErrorCode {
     //Default
     Success(200, "Success", "success"),
     InternalError(500, "InternalError", "InternalError"),
+    LoginFailed(409, "LoginFailed", "Login failed."),
     ;
 
     private final int httpCode;
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/rpc/AccountAPI.java
 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/rpc/AccountAPI.java
index b19639e..c919aa7 100644
--- 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/rpc/AccountAPI.java
+++ 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/rpc/AccountAPI.java
@@ -17,13 +17,32 @@
 
 package org.apache.rocketmq.eventbridge.domain.rpc;
 
-public interface AccountAPI {
+public interface AccountAPI<T> {
 
-    String getLoginAccountId();
+    /**
+     * Return the login accountId.
+     *
+     * @param ctx
+     *
+     * @return
+     */
+    String getLoginAccountId(T ctx);
 
-    String getParentAccountId();
+    /**
+     * Return the parent of login accountId.
+     *
+     * @param ctx
+     *
+     * @return
+     */
+    String getParentAccountId(T ctx);
 
-    String getResourceOwnerAccountId();
-
-    boolean isParentAccount();
+    /**
+     * Return the resource owner's accountId.
+     *
+     * @param ctx
+     *
+     * @return
+     */
+    String getResourceOwnerAccountId(T ctx);
 }
diff --git 
a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/LoginFilter.java 
b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/LoginFilter.java
index 261504b..c045270 100644
--- 
a/start/src/main/java/org/apache/rocketmq/eventbridge/filter/LoginFilter.java
+++ 
b/start/src/main/java/org/apache/rocketmq/eventbridge/filter/LoginFilter.java
@@ -16,6 +16,11 @@
   */
  package org.apache.rocketmq.eventbridge.filter;
 
+ import java.util.List;
+
+ import lombok.extern.slf4j.Slf4j;
+ import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+ import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode;
  import org.springframework.core.annotation.Order;
  import org.springframework.http.server.reactive.ServerHttpRequest;
  import org.springframework.stereotype.Component;
@@ -26,17 +31,33 @@
 
  @Component
  @Order(value = 2)
+ @Slf4j
  public class LoginFilter implements WebFilter {
 
+     public static final String HEADER_KEY_LOGIN_ACCOUNT_ID = "loginAccountId";
+     public static final String HEADER_KEY_PARENT_ACCOUNT_ID = 
"parentAccountId";
+     public static final String HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID = 
"resourceOwnerAccountId";
+
      @Override
      public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain 
chain) {
          ServerHttpRequest request = exchange.getRequest();
-         //request.getBody().flatMap(dataBuffer -> {
-         //    Mono<Void> result = Mono.create(monoSink -> {
-         //        byte[] bytes  = dataBuffer.asByteBuffer().array();
-         //    });
-         //    return result;
-         //});
-         return chain.filter(exchange);
+         return chain.filter(exchange)
+             .subscriberContext(ctx -> {
+                 List<String> parentAccountIds = request.getHeaders()
+                     .get(HEADER_KEY_PARENT_ACCOUNT_ID);
+                 List<String> loginAccountIds = request.getHeaders()
+                     .get(HEADER_KEY_LOGIN_ACCOUNT_ID);
+                 List<String> resourceOwnerIds = request.getHeaders()
+                     .get(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID);
+                if(resourceOwnerIds == null || resourceOwnerIds.isEmpty()){
+                    throw new 
EventBridgeException(DefaultErrorCode.LoginFailed);
+                }
+                 return ctx.put(HEADER_KEY_PARENT_ACCOUNT_ID,
+                     parentAccountIds != null && !parentAccountIds.isEmpty() ? 
parentAccountIds.get(0) : "")
+                     .put(HEADER_KEY_LOGIN_ACCOUNT_ID,
+                         loginAccountIds != null && !loginAccountIds.isEmpty() 
? loginAccountIds.get(0) : "")
+                     .put(HEADER_KEY_RESOURCE_OWNER_ACCOUNT_ID,
+                         resourceOwnerIds != null && 
!resourceOwnerIds.isEmpty() ? resourceOwnerIds.get(0) : "");
+             });
      }
  }

Reply via email to