This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


The following commit(s) were added to refs/heads/main by this push:
     new 198096a  support http source (#9)
198096a is described below

commit 198096a9cf1843e817d2914823504b72e1a52c36
Author: 陈永明 <[email protected]>
AuthorDate: Thu May 12 20:05:45 2022 +0800

    support http source (#9)
    
    * support http source
    
    * support http source
    
    * bug fix
    
    * Modify the logic for generating httpSource token
    
    Co-authored-by: changfeng <[email protected]>
---
 .../api/controller/EventDataController.java        |  31 ++-
 .../api/controller/EventSourceController.java      |   2 +-
 .../adapter/api/converter/HttpEventConverter.java  | 271 +++++++++++++++++++++
 .../adapter/api/dto/data/HttpEventData.java        |  46 ++++
 .../api/dto/source/UpdateEventSourceRequest.java   |   3 +
 .../mybatis/converter/EventSourceConverter.java    |   7 +
 .../repository/MybatisEventSourceRepository.java   |   4 +-
 .../main/resources/mybatis/EventSourceMapper.xml   |   6 +-
 .../adapter/rpc/impl/AppConfigAPIImpl.java         |  12 +-
 .../adapter/rpc/impl/HttpEventAPIImpl.java         |  59 +++++
 .../rocketmq/eventbridge/config/AppConfig.java     |  11 +-
 .../rocketmq/eventbridge/config/GlobalConfig.java  |   2 +-
 .../rocketmq/eventbridge/config/LocalConfig.java   |   6 +-
 .../apache/rocketmq/eventbridge/tools/NetUtil.java |  61 +++++
 .../AppConfig.java => tools/TokenUtil.java}        |  27 +-
 .../tools/pattern/PatternEvaluatorTest.java        |   2 +-
 domain/pom.xml                                     |  12 +
 .../eventbridge/domain/cache/CacheManager.java     |  76 ++++++
 .../eventbridge/domain/cache/CacheName.java        |  26 ++
 .../domain/cache/GeneralKeyGenerator.java          |  54 ++++
 .../eventbridge/domain/common/enums/CacheEnum.java |  43 ++++
 .../domain/common/enums/EventSourceStatusEnum.java |  13 +
 .../common/exception/EventBridgeErrorCode.java     |  12 +
 .../model/source/ConnectEventSourceService.java    |   4 +-
 .../domain/model/source/EventSourceService.java    |  15 +-
 .../model/source/EventSourceServiceFactory.java    |   2 +-
 .../model/source/HTTPEventSourceService.java       | 262 +++++++++++++++++++-
 .../domain/repository/EventSourceRepository.java   |   2 +-
 .../eventbridge/domain/rpc/HttpEventAPI.java       |  57 +++++
 pom.xml                                            |  11 +
 30 files changed, 1089 insertions(+), 50 deletions(-)

diff --git 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventDataController.java
 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventDataController.java
index d809b77..1c7b7b6 100644
--- 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventDataController.java
+++ 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventDataController.java
@@ -17,19 +17,10 @@
 
 package org.apache.rocketmq.eventbridge.adapter.api.controller;
 
-import java.util.List;
-import java.util.Map;
-
 import com.google.common.collect.Lists;
 import io.cloudevents.CloudEvent;
-import io.cloudevents.core.format.EventFormat;
-import io.cloudevents.core.message.MessageReader;
-import io.cloudevents.core.provider.EventFormatProvider;
-import io.cloudevents.http.HttpMessageFactory;
-import io.cloudevents.jackson.JsonFormat;
-import io.netty.handler.codec.http.DefaultHttpHeaders;
-import io.netty.handler.codec.http.HttpHeaders;
 import 
org.apache.rocketmq.eventbridge.adapter.api.converter.EventConverterAdapter;
+import 
org.apache.rocketmq.eventbridge.adapter.api.converter.HttpEventConverter;
 import org.apache.rocketmq.eventbridge.adapter.api.dto.data.PutEventsResponse;
 import org.apache.rocketmq.eventbridge.adapter.api.handler.EventDataHandler;
 import org.apache.rocketmq.eventbridge.domain.rpc.AccountAPI;
@@ -41,9 +32,14 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestHeader;
 import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.server.ServerWebExchange;
 import reactor.core.publisher.Mono;
 
+import java.util.List;
+import java.util.Map;
+
 @RestController
 @RequestMapping("/")
 public class EventDataController {
@@ -57,6 +53,9 @@ public class EventDataController {
     @Autowired
     EventConverterAdapter eventConverterAdapter;
 
+    @Autowired
+    HttpEventConverter httpEventConverter;
+
     @PostMapping(value = {"putEvents"})
     public Mono<PutEventsResponse> putEvents(@RequestHeader Map<String, 
String> headers, @RequestBody byte[] body) {
         List<CloudEvent> cloudEvents = 
eventConverterAdapter.toEventsRequest(headers, body);
@@ -64,6 +63,18 @@ public class EventDataController {
         return 
eventDataHandler.putEvents(accountAPI.getResourceOwnerAccountId(), eventList);
     }
 
+    @RequestMapping(value = {"webhook/putEvents"})
+    public Mono<PutEventsResponse> putHttpEvents(ServerWebExchange 
serverWebExchange,
+                                                 @RequestHeader Map<String, 
String> headers,
+                                                 @RequestBody byte[] body,
+                                                 @RequestParam("token") String 
token) {
+        ServerHttpRequest request = serverWebExchange.getRequest();
+        List<CloudEvent> cloudEvents = 
httpEventConverter.toEventBridgeEvent(request, body,
+                headers, accountAPI.getResourceOwnerAccountId(), token);
+        List<EventBridgeEvent> eventList = 
this.converterEventBridgeEvent(cloudEvents);
+        return 
eventDataHandler.putEvents(accountAPI.getResourceOwnerAccountId(), eventList);
+    }
+
     private List<EventBridgeEvent> converterEventBridgeEvent(List<CloudEvent> 
cloudEvents) {
         List<EventBridgeEvent> eventList = 
Lists.newArrayListWithCapacity(cloudEvents.size());
         if (CollectionUtils.isEmpty(cloudEvents)) {
diff --git 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventSourceController.java
 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventSourceController.java
index f0ab381..7ad1e0a 100644
--- 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventSourceController.java
+++ 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventSourceController.java
@@ -72,7 +72,7 @@ public class EventSourceController {
         
eventSourceService.updateEventSource(accountAPI.getResourceOwnerAccountId(),
             updateEventSourceRequest.getEventBusName(), 
updateEventSourceRequest.getEventSourceName(),
             updateEventSourceRequest.getDescription(), 
updateEventSourceRequest.getClassName(),
-            updateEventSourceRequest.getConfig());
+            updateEventSourceRequest.getStatus(), 
updateEventSourceRequest.getConfig());
         return new UpdateEventSourceResponse();
     }
 
diff --git 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/HttpEventConverter.java
 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/HttpEventConverter.java
new file mode 100644
index 0000000..6e547ee
--- /dev/null
+++ 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/HttpEventConverter.java
@@ -0,0 +1,271 @@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.rocketmq.eventbridge.adapter.api.converter;
+
+ import com.google.common.net.MediaType;
+ import com.google.common.reflect.TypeToken;
+ import com.google.gson.Gson;
+ import io.cloudevents.CloudEvent;
+ import io.cloudevents.core.v1.CloudEventBuilder;
+ import org.apache.commons.lang3.StringUtils;
+ import org.apache.commons.net.util.SubnetUtils;
+ import org.apache.rocketmq.eventbridge.adapter.api.dto.data.HttpEventData;
+ import org.apache.rocketmq.eventbridge.config.AppConfig;
+ import org.apache.rocketmq.eventbridge.domain.model.source.EventSource;
+ import 
org.apache.rocketmq.eventbridge.domain.model.source.HTTPEventSourceService;
+ import org.apache.rocketmq.eventbridge.domain.rpc.HttpEventAPI;
+ import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+ import org.apache.rocketmq.eventbridge.tools.NetUtil;
+ import org.apache.rocketmq.eventbridge.tools.transform.Data;
+ import org.apache.rocketmq.eventbridge.tools.transform.StringData;
+ import org.apache.rocketmq.eventbridge.tools.transform.Transform;
+ import org.apache.rocketmq.eventbridge.tools.transform.TransformBuilder;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.springframework.beans.factory.annotation.Autowired;
+ import org.springframework.http.HttpHeaders;
+ import org.springframework.http.HttpMethod;
+ import org.springframework.http.server.reactive.ServerHttpRequest;
+ import org.springframework.stereotype.Service;
+ import org.springframework.util.CollectionUtils;
+
+ import java.lang.reflect.Type;
+ import java.net.URI;
+ import java.nio.charset.StandardCharsets;
+ import java.time.OffsetDateTime;
+ import java.time.ZonedDateTime;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+
+ import static 
org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode.JSON_ATTRIBUTE_INVALID;
+ import static 
org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode.PutEventsRequestSecurityCheckFailed;
+ import static 
org.apache.rocketmq.eventbridge.domain.model.source.HTTPEventSourceService.SECURITY_CONFIG_IP;
+ import static 
org.apache.rocketmq.eventbridge.domain.model.source.HTTPEventSourceService.SECURITY_CONFIG_NONE;
+ import static 
org.apache.rocketmq.eventbridge.domain.model.source.HTTPEventSourceService.SECURITY_CONFIG_REFERER;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/25 11:28 上午
+ */
+@Service
+public class HttpEventConverter {
+    private static final Logger logger = 
LoggerFactory.getLogger(HttpEventConverter.class);
+
+    @Autowired
+    HttpEventAPI httpEventAPI;
+    @Autowired
+    HTTPEventSourceService httpEventSourceService;
+
+    private static final String HEADER_X_REAL_IP = "x-real-ip";
+    private static final String TYPE = "eventbridge:Events:HTTPEvent";
+    private static final String DATA_CONTENT_TYPE = "application/json";
+
+    private static final String SECURITY_CONFIG = "SecurityConfig";
+    private static final String IP_CONFIG = "Ip";
+    private static final String METHOD_CONFIG = "Method";
+    private static final String REFERER_CONFIG = "Referer";
+
+    private static final Set<String> DISCARD_FIELDS = new HashSet<>();
+
+
+    static {
+        DISCARD_FIELDS.add("authorization");
+        DISCARD_FIELDS.add("cookie");
+        DISCARD_FIELDS.add("proxy-authorization");
+    }
+
+    public List<CloudEvent> toEventBridgeEvent(ServerHttpRequest request, 
byte[] body,
+                                               Map<String, String> headers, 
String accountId, String token) {
+        this.checkConfig(request, headers, accountId, token);
+
+        CloudEvent cloudEvent = parseRequest(request, body, headers, 
accountId, token, null, null);
+
+        return Collections.singletonList(cloudEvent);
+    }
+
+    private CloudEvent parseRequest(ServerHttpRequest request, byte[] body,
+                                    Map<String, String> headers, String 
accountId, String token, String extractJson,
+                                    String template) {
+        EventSource eventSource = 
httpEventSourceService.getEventSourceByToken(accountId, token);
+        HttpEventData httpEventData = getHttpEventData(request, body, headers, 
accountId, token);
+        Map<String, Object> schema = parseSchema(httpEventData, extractJson, 
template);
+        CloudEventBuilder builder = new CloudEventBuilder();
+        String regionId = AppConfig.getLocalConfig().getRegion();
+        CloudEventBuilder builderWithAttributes = addAttributes(regionId, 
accountId, eventSource.getName(), eventSource.getEventBusName(), schema, 
builder);
+        CloudEventBuilder builderWithExtensions = addExtensions(request, 
regionId, accountId, headers, eventSource, builderWithAttributes);
+        HttpEventData data = (HttpEventData) schema.get("data");
+        CloudEventBuilder builderWithData = builderWithExtensions.withData(new 
Gson().toJson(data).getBytes(StandardCharsets.UTF_8));
+        return builderWithData.build();
+    }
+
+    private CloudEventBuilder addAttributes(String regionId, String accountId, 
String sourceName, String busName, Map<String, Object> schema, 
CloudEventBuilder cloudEventBuilder) {
+        CloudEventBuilder newBuilder = cloudEventBuilder.newBuilder();
+        newBuilder.withId(UUID.randomUUID().toString())
+                .withSource(URI.create(sourceName))
+                .withDataContentType(DATA_CONTENT_TYPE)
+                .withDataSchema(null);
+
+        String subject = (String) schema.get("subject");
+        String time = (String) schema.get("time");
+        String type = (String) schema.get("type");
+
+        if (StringUtils.isNotBlank(subject)) {
+            newBuilder.withSubject(subject);
+        } else {
+            newBuilder.withSubject(httpEventAPI.generateSubject(regionId, 
accountId, busName, sourceName));
+        }
+
+        if (StringUtils.isNotBlank(time)) {
+            
newBuilder.withTime(OffsetDateTime.from(ZonedDateTime.parse(time)));
+        } else {
+            newBuilder.withTime(OffsetDateTime.from(ZonedDateTime.now()));
+
+        }
+        if (StringUtils.isNotBlank(type)) {
+            newBuilder.withType(type);
+        } else {
+            newBuilder.withType(TYPE);
+        }
+        return newBuilder;
+    }
+
+    private CloudEventBuilder addExtensions(ServerHttpRequest request,
+                                            String regionId,
+                                            String accountId,
+                                            Map<String, String> headers,
+                                            EventSource eventSource, 
CloudEventBuilder cloudEventBuilder) {
+        return httpEventAPI.addExtensions(request, regionId, accountId, 
headers, eventSource, cloudEventBuilder);
+    }
+
+    private void checkConfig(ServerHttpRequest request, Map<String, String> 
headers, String accountId, String token) {
+        HttpMethod requestMethod = request.getMethod();
+        String requestIp = 
request.getRemoteAddress().getAddress().getHostAddress();
+        if (headers.containsKey(HEADER_X_REAL_IP)) {
+            requestIp = headers.get(HEADER_X_REAL_IP);
+        }
+        String requestReferer = null;
+        if (headers.containsKey(HttpHeaders.REFERER)) {
+            requestReferer = headers.get(HttpHeaders.REFERER);
+        }
+
+        EventSource eventSource = 
httpEventSourceService.getEventSourceByToken(accountId, token);
+        String securityConfig = (String) 
eventSource.getConfig().get(SECURITY_CONFIG);
+        List<String> methods = (List<String>) 
eventSource.getConfig().get(METHOD_CONFIG);
+        List<String> ips = (List<String>) 
eventSource.getConfig().get(IP_CONFIG);
+        List<String> referers = (List<String>) 
eventSource.getConfig().get(REFERER_CONFIG);
+
+        // request method check
+        if (!CollectionUtils.isEmpty(methods) && !new 
HashSet<>(methods).contains(requestMethod.name())) {
+            throw new 
EventBridgeException(PutEventsRequestSecurityCheckFailed, "request methods", 
methods, requestMethod);
+        }
+
+        if 
(SECURITY_CONFIG_NONE.equals(eventSource.getConfig().get(SECURITY_CONFIG))) {
+            return;
+        }
+
+        // ip check
+        if (SECURITY_CONFIG_IP.equals(securityConfig) && 
!CollectionUtils.isEmpty(ips)) {
+            boolean matched = false;
+            for (String ip : ips) {
+                if (StringUtils.equals(ip, requestIp) ||
+                        (NetUtil.isNetSegment(ip) && new 
SubnetUtils(ip).getInfo().isInRange(requestIp))) {
+                    matched = true;
+                    break;
+                }
+            }
+            if (!matched) {
+                throw new 
EventBridgeException(PutEventsRequestSecurityCheckFailed, "sourceIP", ips, 
requestIp);
+            }
+        }
+
+        // referer check
+        if (SECURITY_CONFIG_REFERER.equals(securityConfig) && 
!CollectionUtils.isEmpty(referers)) {
+            if (!new HashSet<>(referers).contains(requestReferer)) {
+                throw new 
EventBridgeException(PutEventsRequestSecurityCheckFailed, "secure domain", 
referers, requestReferer);
+            }
+        }
+    }
+
+    private HttpEventData getHttpEventData(ServerHttpRequest request, byte[] 
body, Map<String, String> headers,
+                                           String accountId, String token) {
+        HttpEventData httpEventData = new HttpEventData();
+        HashMap<String, String> dataHeaders = new HashMap<>();
+        headers.forEach((k, v) -> {
+            if (!DISCARD_FIELDS.contains(k)) {
+                dataHeaders.put(k, v);
+            }
+        });
+
+        Object bodyContent = new String(body);
+        HashMap<String, String> temp = new HashMap<>();
+        dataHeaders.forEach((k, v) -> {
+            temp.put(k.toLowerCase(), v);
+        });
+        try {
+            String contentType = temp.get("content-type");
+            if (StringUtils.isNotBlank(contentType)) {
+                MediaType type = MediaType.parse(contentType);
+                if (type.toString().contains("application/json")) {
+                    bodyContent = new Gson().fromJson((String) bodyContent, 
new TypeToken<Map<String, ?>>() {}.getType());
+                }
+            }
+        } catch (Exception e) {
+            logger.warn("GenerateEBHttpEventData failed. Http content is not a 
valid json format. accountId={}, token={}, content-type={}",
+                    accountId, token, temp.get("content-type"), e);
+            throw new EventBridgeException(JSON_ATTRIBUTE_INVALID);
+        }
+        httpEventData.setBody(bodyContent);
+        httpEventData.setHeaders(dataHeaders);
+        httpEventData.setHttpMethod(request.getMethod().toString());
+        httpEventData.setPath(request.getPath().toString());
+
+        HashMap<String, String> queryParam = new HashMap<>();
+        request.getQueryParams().forEach((k, v) -> {
+            if (!StringUtils.equals("token", k)) {
+                queryParam.put(k, v.get(0));
+            }
+        });
+        httpEventData.setQueryString(queryParam);
+        return httpEventData;
+    }
+
+    private Map<String, Object> parseSchema(HttpEventData httpEventData, 
String extractJson, String template) {
+        // If schema is not defined, default logic is executed
+        if (StringUtils.isBlank(template)) {
+            HashMap<String, Object> result = new HashMap<>();
+            result.put("data", httpEventData);
+            return result;
+        }
+        Transform transform = 
TransformBuilder.buildTemplateTransForm(extractJson, template);
+        StringData stringData = new StringData(new 
Gson().toJson(httpEventData));
+        Data output = transform.process(stringData);
+        Type mapType = new TypeToken<Map<String, Object>>() {}.getType();
+        Map<String, Object> objectMap = new Gson().fromJson(output.toString(), 
mapType);
+        Map<String, Object> templateData = (Map<String, Object>) 
objectMap.get("data");
+        // If data is not defined in the template, the default logic is 
executed
+        if (templateData == null) {
+            objectMap.put("data", httpEventData);
+        }
+        return objectMap;
+    }
+}
diff --git 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/data/HttpEventData.java
 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/data/HttpEventData.java
new file mode 100644
index 0000000..f66bb52
--- /dev/null
+++ 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/data/HttpEventData.java
@@ -0,0 +1,46 @@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.rocketmq.eventbridge.adapter.api.dto.data;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+
+import java.util.Map;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/25 11:27 上午
+ */
+@Data
+public class HttpEventData {
+    @JsonProperty("Body")
+    private Object body;
+
+    @JsonProperty("Headers")
+    private Map<String, String> headers;
+
+    @JsonProperty("HttpMethod")
+    private String httpMethod;
+
+    @JsonProperty("Path")
+    private String path;
+
+    @JsonProperty("QueryString")
+    private Map<String, String> queryString;
+
+}
diff --git 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/source/UpdateEventSourceRequest.java
 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/source/UpdateEventSourceRequest.java
index 2c06cdd..6e8a18d 100644
--- 
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/source/UpdateEventSourceRequest.java
+++ 
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/source/UpdateEventSourceRequest.java
@@ -37,6 +37,9 @@ class UpdateEventSourceRequest extends BaseRequest {
     @SerializedName("ClassName")
     private String className;
 
+    @SerializedName("Status")
+    private Integer status;
+
     @SerializedName("Config")
     private Map<String, Object> config;
 }
diff --git 
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/source/mybatis/converter/EventSourceConverter.java
 
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/source/mybatis/converter/EventSourceConverter.java
index ab5c952..e6be7a7 100644
--- 
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/source/mybatis/converter/EventSourceConverter.java
+++ 
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/source/mybatis/converter/EventSourceConverter.java
@@ -18,9 +18,13 @@
 package 
org.apache.rocketmq.eventbridge.adapter.persistence.source.mybatis.converter;
 
 import java.util.List;
+import java.util.Map;
 
 import com.google.common.collect.Lists;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
 import 
org.apache.rocketmq.eventbridge.adapter.persistence.source.mybatis.dataobject.EventSourceDO;
+import 
org.apache.rocketmq.eventbridge.domain.common.enums.EventSourceStatusEnum;
 import org.apache.rocketmq.eventbridge.domain.common.enums.EventSourceTypeEnum;
 import org.apache.rocketmq.eventbridge.domain.model.source.EventSource;
 
@@ -33,6 +37,9 @@ public class EventSourceConverter {
             .name(eventSourceDO.getName())
             .type(EventSourceTypeEnum.parseFromCode(eventSourceDO.getType()))
             .description(eventSourceDO.getDescription())
+            
.status(EventSourceStatusEnum.parseFromCode(eventSourceDO.getStatus()))
+            .config(new Gson().fromJson(eventSourceDO.getConfig(), new 
TypeToken<Map<String, Object>>() {}.getType()))
+            .className(eventSourceDO.getClassName())
             .gmtCreate(eventSourceDO.getGmtCreate())
             .gmtModify(eventSourceDO.getGmtModify())
             .build();
diff --git 
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/source/mybatis/repository/MybatisEventSourceRepository.java
 
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/source/mybatis/repository/MybatisEventSourceRepository.java
index 42f80d2..09db39a 100644
--- 
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/source/mybatis/repository/MybatisEventSourceRepository.java
+++ 
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/source/mybatis/repository/MybatisEventSourceRepository.java
@@ -74,8 +74,8 @@
 
      @Override
      public boolean updateEventSource(String accountId, String eventBusName, 
String eventSourceName, String description,
-         Map<String, Object> config) {
-         return eventSourceMapper.updateEventSource(accountId, eventBusName, 
eventSourceName, description, null, null)
+         Integer status, Map<String, Object> config) {
+         return eventSourceMapper.updateEventSource(accountId, eventBusName, 
eventSourceName, description, status, new Gson().toJson(config))
              == 1;
      }
  }
\ No newline at end of file
diff --git 
a/adapter/persistence/src/main/resources/mybatis/EventSourceMapper.xml 
b/adapter/persistence/src/main/resources/mybatis/EventSourceMapper.xml
index 9de32fd..320c1c8 100644
--- a/adapter/persistence/src/main/resources/mybatis/EventSourceMapper.xml
+++ b/adapter/persistence/src/main/resources/mybatis/EventSourceMapper.xml
@@ -78,13 +78,13 @@
         <set>
             gmt_modify = now(),
             <if test="description != null">
-                description= #{description}
+                description= #{description},
             </if>
             <if test="status != null">
-                status= #{status}
+                status= #{status},
             </if>
             <if test="config != null">
-                type= #{config}
+                config= #{config},
             </if>
         </set>
         WHERE account_id = #{accountId}
diff --git 
a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/AppConfigAPIImpl.java
 
b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/AppConfigAPIImpl.java
index 4852e15..995da2c 100644
--- 
a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/AppConfigAPIImpl.java
+++ 
b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/AppConfigAPIImpl.java
@@ -22,6 +22,7 @@ import java.util.Set;
 import com.google.common.collect.Sets;
 import org.apache.rocketmq.eventbridge.config.AppConfig;
 import org.apache.rocketmq.eventbridge.config.GlobalConfig;
+import org.apache.rocketmq.eventbridge.config.LocalConfig;
 import org.springframework.stereotype.Component;
 
 @Component
@@ -34,6 +35,15 @@ public class AppConfigAPIImpl {
         Set<String> extensionKeys = Sets.newHashSet();
         extensionKeys.add("aliyuneventbusname");
         globalConfig.setEventExtensionKeys(extensionKeys);
-        AppConfig.refresh(globalConfig);
+
+        LocalConfig localConfig = new LocalConfig();
+        localConfig.setRegion("cn-hangzhou");
+        
localConfig.setPublicHttpWebhookSchema("http://%s.eventbridge.%s.com/webhook/putEvents?token=%s";);
+        
localConfig.setPublicHttpsWebhookSchema("https://%s.eventbridge.%s.com/webhook/putEvents?token=%s";);
+        
localConfig.setVpcHttpWebhookSchema("http://%s.eventbridge-vpc.%s.com/webhook/putEvents?token=%s";);
+        
localConfig.setVpcHttpsWebhookSchema("https://%s.eventbridge-vpc.%s.com/webhook/putEvents?token=%s";);
+
+        AppConfig.refreshGlobalConfig(globalConfig);
+        AppConfig.refreshLocalConfig(localConfig);
     }
 }
diff --git 
a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/HttpEventAPIImpl.java
 
b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/HttpEventAPIImpl.java
new file mode 100644
index 0000000..a6f8e8c
--- /dev/null
+++ 
b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/HttpEventAPIImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.rpc.impl;
+
+import io.cloudevents.core.v1.CloudEventBuilder;
+import org.apache.rocketmq.eventbridge.config.AppConfig;
+import org.apache.rocketmq.eventbridge.domain.model.source.EventSource;
+import org.apache.rocketmq.eventbridge.domain.rpc.HttpEventAPI;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/27 5:21 下午
+ */
+@Service
+public class HttpEventAPIImpl implements HttpEventAPI {
+
+    public static final String EVENTSOURCE_PATTERN = 
"eventbridge:%s:%s:eventbus/%s/eventsource/%s";
+
+
+
+    @Override
+    public CloudEventBuilder addExtensions(ServerHttpRequest request,
+                                           String regionId,
+                                           String accountId,
+                                           Map<String, String> headers,
+                                           EventSource eventSource, 
CloudEventBuilder cloudEventBuilder) {
+        CloudEventBuilder newBuilder = cloudEventBuilder.newBuilder();
+
+        
newBuilder.withExtension(AppConfig.getGlobalConfig().getGetEventBusExtensionKey(),
+                eventSource.getEventBusName());
+
+        return newBuilder;
+    }
+
+    @Override
+    public String generateSubject(String region, String accountId, String 
eventBusName, String eventSourceFullName) {
+        return String.format(EVENTSOURCE_PATTERN, region, accountId, 
eventBusName,
+                eventSourceFullName);
+    }
+}
diff --git 
a/common/src/main/java/org/apache/rocketmq/eventbridge/config/AppConfig.java 
b/common/src/main/java/org/apache/rocketmq/eventbridge/config/AppConfig.java
index f047cec..0e0d11b 100644
--- a/common/src/main/java/org/apache/rocketmq/eventbridge/config/AppConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/eventbridge/config/AppConfig.java
@@ -19,13 +19,22 @@ package org.apache.rocketmq.eventbridge.config;
 public abstract class AppConfig {
 
     protected static GlobalConfig globalConfig = new GlobalConfig();
+    protected static LocalConfig localConfig = new LocalConfig();
 
     public static GlobalConfig getGlobalConfig() {
         return globalConfig;
     }
 
-    public static void refresh(GlobalConfig globalConfig) {
+    public static LocalConfig getLocalConfig() {
+        return localConfig;
+    }
+
+    public static void refreshGlobalConfig(GlobalConfig globalConfig) {
         AppConfig.globalConfig = globalConfig;
     }
 
+    public static void refreshLocalConfig(LocalConfig localConfig) {
+        AppConfig.localConfig = localConfig;
+    }
+
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/eventbridge/config/GlobalConfig.java 
b/common/src/main/java/org/apache/rocketmq/eventbridge/config/GlobalConfig.java
index 67efd06..477b82f 100644
--- 
a/common/src/main/java/org/apache/rocketmq/eventbridge/config/GlobalConfig.java
+++ 
b/common/src/main/java/org/apache/rocketmq/eventbridge/config/GlobalConfig.java
@@ -29,7 +29,7 @@ class GlobalConfig {
 
     private String defaultDataPersistentClusterName;
 
-    private int eventSizeUpLimit = (2 ^ 10) * 64;
+    private int eventSizeUpLimit = (1 << 10) * 64;
 
 
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/eventbridge/config/LocalConfig.java 
b/common/src/main/java/org/apache/rocketmq/eventbridge/config/LocalConfig.java
index 7f5d39a..be81874 100644
--- 
a/common/src/main/java/org/apache/rocketmq/eventbridge/config/LocalConfig.java
+++ 
b/common/src/main/java/org/apache/rocketmq/eventbridge/config/LocalConfig.java
@@ -20,5 +20,9 @@ import lombok.Data;
 
 public @Data
 class LocalConfig {
-
+    private String region;
+    private String publicHttpWebhookSchema;
+    private String publicHttpsWebhookSchema;
+    private String vpcHttpWebhookSchema;
+    private String vpcHttpsWebhookSchema;
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/eventbridge/tools/NetUtil.java 
b/common/src/main/java/org/apache/rocketmq/eventbridge/tools/NetUtil.java
new file mode 100644
index 0000000..41bf466
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/eventbridge/tools/NetUtil.java
@@ -0,0 +1,61 @@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.rocketmq.eventbridge.tools;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/25 11:21 上午
+ */
+public class NetUtil {
+    public static boolean isIpv4(String ip) {
+        ip = ip.trim();
+
+        try {
+            InetAddress address = InetAddress.getByName(ip);
+            if (address instanceof Inet4Address) {
+                return true;
+            }
+            return false;
+        } catch (UnknownHostException e) {
+            return false;
+        }
+    }
+
+    public static boolean isNetSegment(String netSegment) {
+        netSegment = netSegment.trim();
+        if (StringUtils.isNotBlank(netSegment) && netSegment.contains("/")) {
+            String[] parts = netSegment.split("/");
+            if (parts.length == 2) {
+                try {
+                    String net = parts[0];
+                    int maskLength = Integer.parseInt(parts[1]);
+                    return isIpv4(net) && maskLength >= 0 && maskLength <= 32;
+                } catch (Exception e) {
+                    return false;
+                }
+            }
+        }
+        return false;
+    }
+}
diff --git 
a/common/src/main/java/org/apache/rocketmq/eventbridge/config/AppConfig.java 
b/common/src/main/java/org/apache/rocketmq/eventbridge/tools/TokenUtil.java
similarity index 62%
copy from 
common/src/main/java/org/apache/rocketmq/eventbridge/config/AppConfig.java
copy to 
common/src/main/java/org/apache/rocketmq/eventbridge/tools/TokenUtil.java
index f047cec..7a27fdf 100644
--- a/common/src/main/java/org/apache/rocketmq/eventbridge/config/AppConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/eventbridge/tools/TokenUtil.java
@@ -14,18 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.eventbridge.config;
 
-public abstract class AppConfig {
+package org.apache.rocketmq.eventbridge.tools;
 
-    protected static GlobalConfig globalConfig = new GlobalConfig();
+import java.util.UUID;
 
-    public static GlobalConfig getGlobalConfig() {
-        return globalConfig;
-    }
-
-    public static void refresh(GlobalConfig globalConfig) {
-        AppConfig.globalConfig = globalConfig;
+/**
+ * @Author changfeng
+ * @Date 2022/5/9 5:15 下午
+ */
+public class TokenUtil {
+    /**
+     * Generate the token of the http source.
+     * @return
+     */
+    public static String generateHttpSourceToken() {
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < 4; i++) {
+            builder.append(UUID.randomUUID().toString().replace("-", ""));
+        }
+        return builder.toString();
     }
-
 }
diff --git 
a/common/src/test/java/org/apache/rocketmq/eventbridge/tools/pattern/PatternEvaluatorTest.java
 
b/common/src/test/java/org/apache/rocketmq/eventbridge/tools/pattern/PatternEvaluatorTest.java
index 0f13f9b..17f30a9 100644
--- 
a/common/src/test/java/org/apache/rocketmq/eventbridge/tools/pattern/PatternEvaluatorTest.java
+++ 
b/common/src/test/java/org/apache/rocketmq/eventbridge/tools/pattern/PatternEvaluatorTest.java
@@ -38,7 +38,7 @@ public class PatternEvaluatorTest {
         GlobalConfig globalConfig = new GlobalConfig();
         Set<String> eventExtensionKeys = Sets.newHashSet("aliyunregionid");
         globalConfig.setEventExtensionKeys(eventExtensionKeys);
-        AppConfig.refresh(globalConfig);
+        AppConfig.refreshGlobalConfig(globalConfig);
     }
 
     @Test
diff --git a/domain/pom.xml b/domain/pom.xml
index 26141e6..9df9b83 100644
--- a/domain/pom.xml
+++ b/domain/pom.xml
@@ -27,6 +27,14 @@
             <groupId>org.springframework</groupId>
             <artifactId>spring-context</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-context-support</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-web</artifactId>
+        </dependency>
 
         <!-- Log -->
         <dependency>
@@ -47,6 +55,10 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-text</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+        </dependency>
 
         <!-- Test -->
         <dependency>
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/CacheManager.java
 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/CacheManager.java
new file mode 100644
index 0000000..fdcb413
--- /dev/null
+++ 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/CacheManager.java
@@ -0,0 +1,76 @@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.rocketmq.eventbridge.domain.cache;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.rocketmq.eventbridge.domain.common.enums.CacheEnum;
+import org.springframework.cache.Cache;
+import org.springframework.cache.caffeine.CaffeineCache;
+import org.springframework.cache.support.AbstractCacheManager;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/25 11:08 上午
+ */
+@Component
+public class CacheManager extends AbstractCacheManager {
+    private List<Cache> caches = new ArrayList<>();
+
+    public CacheManager() {
+        for (CacheEnum c : CacheEnum.values()) {
+            this.dynamicAddCache(new CaffeineCache(c.name(), 
Caffeine.newBuilder()
+                    .recordStats()
+                    .expireAfterWrite(c.getTtl(), TimeUnit.SECONDS)
+                    .maximumSize(c.getMaxSize())
+                    .build()));
+        }
+    }
+
+    @Override
+    protected Collection<? extends Cache> loadCaches() {
+        return caches;
+    }
+
+    public Collection<Cache> getCaches() {
+        return caches;
+    }
+
+    public synchronized void dynamicAddCache(Cache cache) {
+        int existIndex = -1;
+        for (int i = 0; i < caches.size(); i++) {
+            if (cache.getName()
+                    .equals(caches.get(i)
+                            .getName())) {
+                existIndex = i;
+                break;
+            }
+        }
+        if (existIndex >= 0) {
+            this.caches.set(existIndex, cache);
+        } else {
+            this.caches.add(cache);
+        }
+        super.addCache(cache);
+    }
+}
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/CacheName.java
 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/CacheName.java
new file mode 100644
index 0000000..d91b546
--- /dev/null
+++ 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/CacheName.java
@@ -0,0 +1,26 @@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.rocketmq.eventbridge.domain.cache;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/25 11:14 上午
+ */
+public class CacheName {
+    public static final String EVENT_SOURCE = "event_source";
+}
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/GeneralKeyGenerator.java
 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/GeneralKeyGenerator.java
new file mode 100644
index 0000000..395a6bd
--- /dev/null
+++ 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/GeneralKeyGenerator.java
@@ -0,0 +1,54 @@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.rocketmq.eventbridge.domain.cache;
+
+import org.springframework.cache.interceptor.KeyGenerator;
+import org.springframework.cache.interceptor.SimpleKey;
+import org.springframework.stereotype.Component;
+
+import java.lang.reflect.Method;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/25 11:14 上午
+ */
+@Component
+public class GeneralKeyGenerator implements KeyGenerator {
+    @Override
+    public Object generate(Object target, Method method, Object... params) {
+        return generateKey(params);
+    }
+
+    /**
+     * Generate a key based on the specified parameters.
+     */
+    public static Object generateKey(Object... params) {
+        if (params.length == 0) {
+            return SimpleKey.EMPTY;
+        } else {
+            StringBuilder keyBuilder = new StringBuilder();
+            for (int i = 0; i < params.length; i++) {
+                keyBuilder.append(params[i].toString());
+                if (i != params.length -1) {
+                    keyBuilder.append("/");
+                }
+            }
+            return new SimpleKey(keyBuilder.toString());
+        }
+    }
+}
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/CacheEnum.java
 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/CacheEnum.java
new file mode 100644
index 0000000..34d4f74
--- /dev/null
+++ 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/CacheEnum.java
@@ -0,0 +1,43 @@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.rocketmq.eventbridge.domain.common.enums;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/25 11:09 上午
+ */
+public enum CacheEnum {
+    event_source(60, 500);
+
+
+    private int maxSize;
+    private int ttl;
+
+    CacheEnum(int ttl, int maxSize) {
+        this.ttl = ttl;
+        this.maxSize = maxSize;
+    }
+
+    public int getMaxSize() {
+        return maxSize;
+    }
+
+    public int getTtl() {
+        return ttl;
+    }
+}
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/EventSourceStatusEnum.java
 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/EventSourceStatusEnum.java
index ab454ca..3ac6e57 100644
--- 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/EventSourceStatusEnum.java
+++ 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/EventSourceStatusEnum.java
@@ -17,6 +17,10 @@
 
 package org.apache.rocketmq.eventbridge.domain.common.enums;
 
+import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+
+import static 
org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode.EventSourceStatusInvalid;
+
 public enum EventSourceStatusEnum {
     ACTIVATED(1),
     FROZEN(0);
@@ -31,4 +35,13 @@ public enum EventSourceStatusEnum {
         return code;
     }
 
+    public static EventSourceStatusEnum parseFromCode(int code) {
+        for (EventSourceStatusEnum sourceStatusType : 
EventSourceStatusEnum.values()) {
+            if (sourceStatusType.code == code) {
+                return sourceStatusType;
+            }
+        }
+        throw new EventBridgeException(EventSourceStatusInvalid, code);
+    }
+
 }
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/exception/EventBridgeErrorCode.java
 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/exception/EventBridgeErrorCode.java
index 19d1349..c85b36c 100644
--- 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/exception/EventBridgeErrorCode.java
+++ 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/exception/EventBridgeErrorCode.java
@@ -23,10 +23,15 @@ public enum EventBridgeErrorCode implements BaseErrorCode {
     //Default
     Success(200, "Success", "success"),
     InternalError(500, "InternalError", "InternalError"),
+    GenerateTokenError(500, "GenerateTokenError", "Generate token failed, 
{0}."),
 
     //Put Events
     PutEventsRequestMoreThanOneEventBus(409, 
"PutEventsRequestMoreThanOneEventBus",
         "The put events request has more than one  event bus [{0}] "),
+    PutEventsRequestSecurityCheckFailed(409, 
"PutEventsRequestSecurityCheckFailed",
+            "The putEvents request failed the webhook security check for {0}. 
" +
+                    "Event source configuration is {1}, the parameter in the 
request is {2}."),
+    JSON_ATTRIBUTE_INVALID(409, "JsonAttributeInvalid", "The Json attribute is 
invalid"),
 
     //Event Bus
     EventBusNotExist(409, "EventBusNotExist", "The event bus [{0}] not 
existed!"),
@@ -47,8 +52,15 @@ public enum EventBridgeErrorCode implements BaseErrorCode {
         "The current count of event source is [{0}], which will exceed the 
limit quota [{1}]"),
     EventSourceNameInvalid(409, "EventSourceNameInvalid", "The event source 
name [{0}] is invalid!"),
     EventSourceTypeInvalid(409, "EventSourceTypeInvalid", "The event source 
type[{0}] is invalid!"),
+    EventSourceStatusInvalid(409, "EventSourceStatusInvalid", "The event 
source status[{0}] is invalid!"),
     EventSourceTypeOrClassInvalid(409, "EventSourceTypeOrClassInvalid",
         "The event source type[{0}] or class[{1}] is invalid!"),
+    HttpSourceParametersInvalid(409, "HttpSourceParametersInvalid",
+            "The parameters of http source is invalid. {0}"),
+    HttpSourceParametersEmpty(409, "HttpSourceParametersEmpty",
+            "The parameters of http source is empty or contains empty value. 
Invalid parameter name={0}"),
+    ExceedHttpSourceParametersCount(409, "ExceedHttpSourceParametersCount",
+            "Exceed http source parameters count limit. Limit count is {0}, 
and the value of {1} is {2}"),
 
     //Event Target
     EventTargetNotExist(409, "EventTargetNotExist",
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/ConnectEventSourceService.java
 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/ConnectEventSourceService.java
index 381a98c..9469f42 100644
--- 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/ConnectEventSourceService.java
+++ 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/ConnectEventSourceService.java
@@ -76,9 +76,9 @@ public class ConnectEventSourceService extends 
EventSourceService {
     @Transactional
     @Override
     public boolean updateEventSource(String accountId, String eventBusName, 
String eventSourceName, String description,
-        String className, Map<String, Object> inputConfig) {
+        String className, Integer status, Map<String, Object> inputConfig) {
         boolean isSucceed = super.updateEventSource(accountId, eventBusName, 
eventSourceName, description, className,
-            inputConfig);
+            status, inputConfig);
         if (!Strings.isNullOrEmpty(className)) {
             return isSucceed && 
eventSourceRunnerService.updateEventSourceRunner(accountId, eventBusName,
                 eventSourceName, className, inputConfig, null);
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/EventSourceService.java
 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/EventSourceService.java
index 65f2783..a9aed47 100644
--- 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/EventSourceService.java
+++ 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/EventSourceService.java
@@ -16,22 +16,21 @@
   */
  package org.apache.rocketmq.eventbridge.domain.model.source;
 
- import java.util.List;
- import java.util.Map;
-
  import com.google.common.base.Strings;
  import 
org.apache.rocketmq.eventbridge.domain.common.enums.EventSourceStatusEnum;
  import 
org.apache.rocketmq.eventbridge.domain.common.enums.EventSourceTypeEnum;
  import org.apache.rocketmq.eventbridge.domain.model.AbstractResourceService;
  import org.apache.rocketmq.eventbridge.domain.model.PaginationResult;
  import org.apache.rocketmq.eventbridge.domain.model.bus.EventBusService;
- import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
  import 
org.apache.rocketmq.eventbridge.domain.repository.EventSourceRepository;
  import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
  import org.springframework.dao.DuplicateKeyException;
  import org.springframework.stereotype.Service;
  import org.springframework.transaction.annotation.Transactional;
 
+ import java.util.List;
+ import java.util.Map;
+
  import static 
org.apache.rocketmq.eventbridge.domain.common.EventBridgeConstants.EVENT_SOURCE_COUNT_LIMIT;
  import static 
org.apache.rocketmq.eventbridge.domain.common.EventBridgeConstants.EVENT_SOURCE_NAME_MAX_LENGTH;
  import static 
org.apache.rocketmq.eventbridge.domain.common.EventBridgeConstants.EVENT_SOURCE_NAME_MIN_LENGTH;
@@ -45,8 +44,8 @@
  @Service
  public class EventSourceService extends AbstractResourceService {
 
-     private final EventBusService eventBusService;
-     private final EventSourceRepository eventSourceRepository;
+     protected final EventBusService eventBusService;
+     protected final EventSourceRepository eventSourceRepository;
 
      public EventSourceService(EventBusService eventBusService, 
EventSourceRepository eventSourceRepository) {
          this.eventBusService = eventBusService;
@@ -87,9 +86,9 @@
 
      @Transactional
      public boolean updateEventSource(String accountId, String eventBusName, 
String eventSourceName, String description,
-         String className, Map<String, Object> inputConfig) {
+         String className, Integer status, Map<String, Object> inputConfig) {
          this.checkExist(accountId, eventBusName, eventSourceName);
-         return eventSourceRepository.updateEventSource(accountId, 
eventBusName, eventSourceName, description,
+         return eventSourceRepository.updateEventSource(accountId, 
eventBusName, eventSourceName, description, status,
              inputConfig);
 
      }
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/EventSourceServiceFactory.java
 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/EventSourceServiceFactory.java
index 86a14c5..f944e18 100644
--- 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/EventSourceServiceFactory.java
+++ 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/EventSourceServiceFactory.java
@@ -49,7 +49,7 @@
              .stream()
              .filter(eventSourceService -> eventSourceService.match(type, 
className))
              .findFirst();
-         if (eventSourceServiceOptional.get() == null) {
+         if (!eventSourceServiceOptional.isPresent()) {
              throw new EventBridgeException(EventSourceTypeOrClassInvalid, 
type, className);
          }
          return eventSourceServiceOptional.get();
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/HTTPEventSourceService.java
 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/HTTPEventSourceService.java
index 002a365..c8c88e6 100644
--- 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/HTTPEventSourceService.java
+++ 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/HTTPEventSourceService.java
@@ -17,18 +17,61 @@
 
 package org.apache.rocketmq.eventbridge.domain.model.source;
 
-import java.util.Map;
-
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.eventbridge.config.AppConfig;
+import org.apache.rocketmq.eventbridge.domain.cache.CacheManager;
+import org.apache.rocketmq.eventbridge.domain.cache.GeneralKeyGenerator;
 import org.apache.rocketmq.eventbridge.domain.common.enums.EventSourceTypeEnum;
+import org.apache.rocketmq.eventbridge.domain.model.PaginationResult;
+import org.apache.rocketmq.eventbridge.domain.model.bus.EventBus;
 import org.apache.rocketmq.eventbridge.domain.model.bus.EventBusService;
 import org.apache.rocketmq.eventbridge.domain.repository.EventSourceRepository;
+import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+import org.apache.rocketmq.eventbridge.tools.NetUtil;
+import org.apache.rocketmq.eventbridge.tools.TokenUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.annotation.Cacheable;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.rocketmq.eventbridge.domain.cache.CacheName.EVENT_SOURCE;
+import static 
org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode.ExceedHttpSourceParametersCount;
+import static 
org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode.GenerateTokenError;
+import static 
org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode.HttpSourceParametersEmpty;
+import static 
org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode.HttpSourceParametersInvalid;
+
 @Service
 public class HTTPEventSourceService extends EventSourceService {
+    private static final Logger logger = 
LoggerFactory.getLogger(HTTPEventSourceService.class);
 
     private static final String CLASS_NAME = "HttpEvent";
+    private static final Integer GET_TOKEN_TIMES = 100;
+    private static final String TOKEN_CONFIG = "Token";
+
+    private static final Set<String> SOURCE_PARAM_METHODS = new 
HashSet<>(Arrays.asList("GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", 
"OPTIONS", "TRACE", "CONNECT"));
+    private static final Set<String> SOURCE_PARAM_TYPES = new 
HashSet<>(Arrays.asList("HTTP", "HTTPS", "HTTP&HTTPS"));
+
+    public static final String SECURITY_CONFIG_NONE = "none";
+    public static final String SECURITY_CONFIG_IP = "ip";
+    public static final String SECURITY_CONFIG_REFERER = "referer";
+    private static final Set<String> SOURCE_PARAM_SECURITY_CONFIG = new 
HashSet<>(Arrays.asList(SECURITY_CONFIG_NONE, SECURITY_CONFIG_IP, 
SECURITY_CONFIG_REFERER));
+
+    private static final Integer SECURITY_CONFIG_LENGTH = 5;
+    private static final Integer REFERER_LENGTH_LIMIT = 256;
 
+    @Autowired
+    CacheManager cacheManager;
     public HTTPEventSourceService(EventBusService eventBusService, 
EventSourceRepository eventSourceRepository) {
         super(eventBusService, eventSourceRepository);
     }
@@ -45,21 +88,226 @@ public class HTTPEventSourceService extends 
EventSourceService {
     @Transactional
     @Override
     public boolean createEventSource(String accountId, String eventBusName, 
String eventSourceName, String description,
-        String className, Map<String, Object> inputConfig) {
+                                     String className, Map<String, Object> 
inputConfig) {
         // 校验
         checkConfig(inputConfig);
         // 渲染
-        Map<String, Object> renderConfig = renderConfig(inputConfig);
+        Map<String, Object> renderConfig = renderConfig(accountId, 
eventBusName, eventSourceName, inputConfig);
         return super.createEventSource(accountId, eventBusName, 
eventSourceName, description, className, renderConfig);
+    }
+
+    @Override
+    public boolean deleteEventSource(String accountId, String eventBusName, 
String eventSourceName) {
+        this.evict(accountId, eventBusName, eventSourceName);
+        return super.deleteEventSource(accountId, eventBusName, 
eventSourceName);
+    }
 
+    @Override
+    public boolean updateEventSource(String accountId, String eventBusName, 
String eventSourceName, String description, String className, Integer status, 
Map<String, Object> inputConfig) {
+        this.evict(accountId, eventBusName, eventSourceName);
+        // 校验
+        checkConfig(inputConfig);
+        // 渲染
+        Map<String, Object> renderConfig = renderConfig(accountId, 
eventBusName, eventSourceName, inputConfig);
+        return super.updateEventSource(accountId, eventBusName, 
eventSourceName, description, className, status, renderConfig);
     }
 
     private void checkConfig(Map<String, Object> inputConfig) {
-        //TODO
+        HashMap<String, Object> sourceConfig = new 
HashMap<>(inputConfig.size());
+        inputConfig.forEach((k ,v) -> {
+            sourceConfig.put(k.toLowerCase(), v);
+        });
+        // check type
+        checkType((String) sourceConfig.get("type"));
+
+        // check method
+        checkMethod((List<String>) sourceConfig.get("method"));
+
+        // check security config
+        String securityConfig = (String) sourceConfig.get("securityconfig");
+        checkSecurityConfig(securityConfig);
+
+        // check ip
+        checkIp(securityConfig, (List<String>) sourceConfig.get("ip"));
+
+        // check referer
+        checkReferer(securityConfig, (List<String>) 
sourceConfig.get("referer"));
     }
 
-    private Map<String, Object> renderConfig(Map<String, Object> inputConfig) {
-        //TODO
+    private Map<String, Object> renderConfig(String accountId, String 
eventBusName, String eventSourceName, Map<String, Object> inputConfig) {
+        HashMap<String, Object> result = new HashMap<>(inputConfig);
+
+        // The ip and referer parameters from the sdk are empty when 
securityConfig is none
+        result.putIfAbsent("Ip", new ArrayList<>());
+        result.putIfAbsent("Referer", new ArrayList<>());
+
+        EventSource eventSource =  
eventSourceRepository.getEventSource(accountId, eventBusName, eventSourceName);
+
+        String regionId = AppConfig.getLocalConfig().getRegion();
+        String type = (String) inputConfig.get("Type");
+        String token;
+        if (eventSource != null && this.match(eventSource.getType(), 
eventSource.getClassName())) {
+            token = (String) eventSource.getConfig().get(TOKEN_CONFIG);
+        } else {
+            token = generateToken(accountId);
+        }
+        result.put(TOKEN_CONFIG, token);
+        result.put("PublicWebHookUrl", generateWebHookUrl(regionId, accountId, 
type, token, false));
+        result.put("VpcWebHookUrl", generateWebHookUrl(regionId, accountId, 
type, token, true));
+
+        return result;
+    }
+
+    public String generateToken(String accountId) throws EventBridgeException {
+        int count = GET_TOKEN_TIMES;
+        String token = TokenUtil.generateHttpSourceToken();
+
+        Set<String> tokenSet = new HashSet<>();
+        int busCount = eventBusService.getEventBusesCount(accountId);
+        PaginationResult<List<EventBus>> paginationResult =
+                eventBusService.listEventBuses(accountId, "0", busCount);
+
+        for (EventBus eventBus: paginationResult.getData()) {
+            int sourceCount = getEventSourceCount(accountId, 
eventBus.getName());
+            PaginationResult<List<EventSource>> listEventSources =
+                    listEventSources(accountId, eventBus.getName(), "0", 
sourceCount);
+
+            listEventSources.getData().stream()
+                    .filter(eventSource -> this.match(eventSource.getType(), 
eventSource.getClassName()))
+                    .forEach(eventSource -> tokenSet.add((String) 
eventSource.getConfig().get(TOKEN_CONFIG)));
+        }
+
+        if (count > 0 && tokenSet.contains(token)) {
+            token = TokenUtil.generateHttpSourceToken();
+            count--;
+        }
+        if (count == 0) {
+            throw new EventBridgeException(GenerateTokenError, "Get token 
failed with " + GET_TOKEN_TIMES + " retry times");
+        }
+        return token;
+    }
+
+    public List<String> generateWebHookUrl(String regionId, String accountId, 
String type, String token, boolean isVpc) {
+        List<String> webHookUrl = new ArrayList<>();
+        String httpWebHookSchema = isVpc ? 
AppConfig.getLocalConfig().getVpcHttpWebhookSchema() :
+                AppConfig.getLocalConfig().getPublicHttpWebhookSchema();
+        String httpsWebHookSchema = isVpc ? 
AppConfig.getLocalConfig().getVpcHttpsWebhookSchema() :
+                AppConfig.getLocalConfig().getPublicHttpsWebhookSchema();
+        if ("HTTP".equalsIgnoreCase(type)) {
+            webHookUrl.add(String.format(httpWebHookSchema, accountId, 
regionId, token));
+        } else if ("HTTPS".equalsIgnoreCase(type)) {
+            webHookUrl.add(String.format(httpsWebHookSchema, accountId, 
regionId, token));
+        } else {
+            webHookUrl.add(String.format(httpWebHookSchema, accountId, 
regionId, token));
+            webHookUrl.add(String.format(httpsWebHookSchema, accountId, 
regionId, token));
+        }
+        return webHookUrl;
+    }
+
+    private void checkType(String type) throws EventBridgeException {
+        if (StringUtils.isBlank(type)) {
+            throw new EventBridgeException(HttpSourceParametersEmpty, "Type");
+        }
+        if (!SOURCE_PARAM_TYPES.contains(type)) {
+            throw new EventBridgeException(HttpSourceParametersInvalid, 
"Parameter name is Type and value is " + type);
+        }
+    }
+
+    private void checkMethod(List<String> methods) throws EventBridgeException 
{
+        if (CollectionUtils.isEmpty(methods)) {
+            throw new EventBridgeException(HttpSourceParametersEmpty, 
"Method");
+        }
+        for (String method : methods) {
+            if (StringUtils.isBlank(method)) {
+                throw new EventBridgeException(HttpSourceParametersEmpty, 
"Method");
+            }
+            if (!SOURCE_PARAM_METHODS.contains(method)) {
+                throw new EventBridgeException(HttpSourceParametersInvalid, 
"Parameter name is Method and value is " + method);
+            }
+        }
+    }
+
+    private void checkSecurityConfig(String securityConfig) throws 
EventBridgeException {
+        if (StringUtils.isBlank(securityConfig)) {
+            throw new EventBridgeException(HttpSourceParametersEmpty, 
"SecurityConfig");
+        }
+        if (!SOURCE_PARAM_SECURITY_CONFIG.contains(securityConfig)) {
+            throw new EventBridgeException(HttpSourceParametersInvalid, 
"Parameter name is SecurityConfig and value is " + securityConfig);
+        }
+    }
+
+    private void checkIp(String securityConfig, List<String> subnets) throws 
EventBridgeException {
+        if (!SECURITY_CONFIG_IP.equals(securityConfig) && 
!CollectionUtils.isEmpty(subnets)) {
+            throw new EventBridgeException(HttpSourceParametersInvalid, 
"Parameter Ip should be empty when SecurityConfig is " + securityConfig);
+        }
+        if (SECURITY_CONFIG_IP.equals(securityConfig)) {
+            if (CollectionUtils.isEmpty(subnets)) {
+                throw new EventBridgeException(HttpSourceParametersInvalid, 
"Parameter Ip should not be empty when SecurityConfig is " + securityConfig);
+            }
+            if (subnets.size() > SECURITY_CONFIG_LENGTH) {
+                throw new 
EventBridgeException(ExceedHttpSourceParametersCount, SECURITY_CONFIG_LENGTH, 
"Ip", subnets.size());
+            }
+            for (String subnet : subnets) {
+                if (!NetUtil.isIpv4(subnet) && !NetUtil.isNetSegment(subnet)) {
+                    throw new 
EventBridgeException(HttpSourceParametersInvalid, "Illegal IP or network 
segment: " + subnet);
+                }
+            }
+        }
+    }
+
+    private void checkReferer(String securityConfig, List<String> referers) 
throws EventBridgeException {
+        if (!SECURITY_CONFIG_REFERER.equals(securityConfig) && 
!CollectionUtils.isEmpty(referers)) {
+            throw new EventBridgeException(HttpSourceParametersInvalid, 
"Parameter Referer should be empty when SecurityConfig is " + securityConfig);
+        }
+        if (SECURITY_CONFIG_REFERER.equals(securityConfig)) {
+            if (CollectionUtils.isEmpty(referers)) {
+                throw new EventBridgeException(HttpSourceParametersInvalid, 
"Parameter Referer should not be empty when SecurityConfig is " + 
securityConfig);
+            }
+            if (referers.size() > SECURITY_CONFIG_LENGTH) {
+                throw new 
EventBridgeException(ExceedHttpSourceParametersCount, SECURITY_CONFIG_LENGTH, 
"Referer", referers.size());
+            }
+            for (String referer : referers) {
+                if (StringUtils.isBlank(referer)) {
+                    throw new EventBridgeException(HttpSourceParametersEmpty, 
"Referer");
+                }
+                if (referer.length() > REFERER_LENGTH_LIMIT) {
+                    throw new 
EventBridgeException(HttpSourceParametersInvalid, "Parameter Referer too long. 
referer=" + referer);
+                }
+            }
+        }
+    }
+
+
+    @Cacheable(cacheNames = EVENT_SOURCE, keyGenerator = 
"generalKeyGenerator", unless = "#result == null")
+    public EventSource getEventSourceByToken(String accountId, String token) {
+        try {
+            int busCount = this.eventBusService.getEventBusesCount(accountId);
+            PaginationResult<List<EventBus>> paginationResult =
+                    eventBusService.listEventBuses(accountId, "0", busCount);
+            for (EventBus eventBus: paginationResult.getData()) {
+                int sourceCount = 
eventSourceRepository.getEventSourceCount(accountId, eventBus.getName());
+                List<EventSource> eventSources = eventSourceRepository
+                        .listEventSources(accountId, eventBus.getName(), "0", 
sourceCount);
+                for (EventSource eventSource: eventSources) {
+                    String sourceToken = (String) 
eventSource.getConfig().get(TOKEN_CONFIG);
+                    if (StringUtils.isNotBlank(sourceToken) && 
StringUtils.equals(sourceToken, token)) {
+                        return eventSource;
+                    }
+                }
+            }
+        } catch (Throwable t) {
+            logger.error("EventSourceCacheService getEventSourceByToken 
error.", t);
+        }
         return null;
     }
+
+    public void evict(String accountId, String eventBusName, String 
eventSourceName) {
+        EventSource eventSource = 
eventSourceRepository.getEventSource(accountId, eventBusName, eventSourceName);
+        if (eventSource != null) {
+            String token = (String) eventSource.getConfig().get(TOKEN_CONFIG);
+            if (StringUtils.isNotBlank(token)) {
+                
cacheManager.getCache(EVENT_SOURCE).evict(GeneralKeyGenerator.generateKey(accountId,
 token));
+            }
+        }
+    }
 }
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/repository/EventSourceRepository.java
 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/repository/EventSourceRepository.java
index c2c8e25..e05b24b 100644
--- 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/repository/EventSourceRepository.java
+++ 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/repository/EventSourceRepository.java
@@ -38,5 +38,5 @@ public interface EventSourceRepository {
     List<EventSource> listEventSources(String accountId, String eventBusName, 
String nextToken, int maxResults);
 
     boolean updateEventSource(String accountId, String eventBusName, String 
eventSourceName, String description,
-        Map<String, Object> config);
+        Integer status, Map<String, Object> config);
 }
diff --git 
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/rpc/HttpEventAPI.java
 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/rpc/HttpEventAPI.java
new file mode 100644
index 0000000..cd9b825
--- /dev/null
+++ 
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/rpc/HttpEventAPI.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.domain.rpc;
+
+import io.cloudevents.core.v1.CloudEventBuilder;
+import org.apache.rocketmq.eventbridge.domain.model.source.EventSource;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+
+import java.util.Map;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/27 5:20 下午
+ */
+public interface HttpEventAPI {
+    /**
+     * add extensions into the CloudEvent
+     * @param request
+     * @param regionId
+     * @param accountId
+     * @param headers
+     * @param eventSource
+     * @param cloudEventBuilder
+     * @return
+     */
+    CloudEventBuilder addExtensions(ServerHttpRequest request,
+                                    String regionId,
+                                    String accountId,
+                                    Map<String, String> headers,
+                                    EventSource eventSource, CloudEventBuilder 
cloudEventBuilder);
+
+    /**
+     * generate the subject attribute of CloudEvent
+     * @param region
+     * @param accountId
+     * @param eventBusName
+     * @param eventSourceFullName
+     * @return
+     */
+    String generateSubject(String region, String accountId, String 
eventBusName,
+                             String eventSourceFullName);
+}
diff --git a/pom.xml b/pom.xml
index 2f92458..579a941 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,6 +30,7 @@
         <cloudevents.version>2.3.0</cloudevents.version>
         <apache.commons-text.version>1.9</apache.commons-text.version>
         <mockito.version>2.13.0</mockito.version>
+        <caffeine.version>2.9.3</caffeine.version>
     </properties>
     <modules>
         <module>start</module>
@@ -101,6 +102,11 @@
                 <artifactId>spring-context</artifactId>
                 <version>${spring.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.springframework</groupId>
+                <artifactId>spring-context-support</artifactId>
+                <version>${spring.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.springframework</groupId>
                 <artifactId>spring-tx</artifactId>
@@ -163,6 +169,11 @@
                 <artifactId>commons-text</artifactId>
                 <version>${apache.commons-text.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.github.ben-manes.caffeine</groupId>
+                <artifactId>caffeine</artifactId>
+                <version>${caffeine.version}</version>
+            </dependency>
 
             <!-- Log -->
             <dependency>

Reply via email to