This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/main by this push:
new 198096a support http source (#9)
198096a is described below
commit 198096a9cf1843e817d2914823504b72e1a52c36
Author: 陈永明 <[email protected]>
AuthorDate: Thu May 12 20:05:45 2022 +0800
support http source (#9)
* support http source
* support http source
* bug fix
* Modify the logic for generating httpSource token
Co-authored-by: changfeng <[email protected]>
---
.../api/controller/EventDataController.java | 31 ++-
.../api/controller/EventSourceController.java | 2 +-
.../adapter/api/converter/HttpEventConverter.java | 271 +++++++++++++++++++++
.../adapter/api/dto/data/HttpEventData.java | 46 ++++
.../api/dto/source/UpdateEventSourceRequest.java | 3 +
.../mybatis/converter/EventSourceConverter.java | 7 +
.../repository/MybatisEventSourceRepository.java | 4 +-
.../main/resources/mybatis/EventSourceMapper.xml | 6 +-
.../adapter/rpc/impl/AppConfigAPIImpl.java | 12 +-
.../adapter/rpc/impl/HttpEventAPIImpl.java | 59 +++++
.../rocketmq/eventbridge/config/AppConfig.java | 11 +-
.../rocketmq/eventbridge/config/GlobalConfig.java | 2 +-
.../rocketmq/eventbridge/config/LocalConfig.java | 6 +-
.../apache/rocketmq/eventbridge/tools/NetUtil.java | 61 +++++
.../AppConfig.java => tools/TokenUtil.java} | 27 +-
.../tools/pattern/PatternEvaluatorTest.java | 2 +-
domain/pom.xml | 12 +
.../eventbridge/domain/cache/CacheManager.java | 76 ++++++
.../eventbridge/domain/cache/CacheName.java | 26 ++
.../domain/cache/GeneralKeyGenerator.java | 54 ++++
.../eventbridge/domain/common/enums/CacheEnum.java | 43 ++++
.../domain/common/enums/EventSourceStatusEnum.java | 13 +
.../common/exception/EventBridgeErrorCode.java | 12 +
.../model/source/ConnectEventSourceService.java | 4 +-
.../domain/model/source/EventSourceService.java | 15 +-
.../model/source/EventSourceServiceFactory.java | 2 +-
.../model/source/HTTPEventSourceService.java | 262 +++++++++++++++++++-
.../domain/repository/EventSourceRepository.java | 2 +-
.../eventbridge/domain/rpc/HttpEventAPI.java | 57 +++++
pom.xml | 11 +
30 files changed, 1089 insertions(+), 50 deletions(-)
diff --git
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventDataController.java
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventDataController.java
index d809b77..1c7b7b6 100644
---
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventDataController.java
+++
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventDataController.java
@@ -17,19 +17,10 @@
package org.apache.rocketmq.eventbridge.adapter.api.controller;
-import java.util.List;
-import java.util.Map;
-
import com.google.common.collect.Lists;
import io.cloudevents.CloudEvent;
-import io.cloudevents.core.format.EventFormat;
-import io.cloudevents.core.message.MessageReader;
-import io.cloudevents.core.provider.EventFormatProvider;
-import io.cloudevents.http.HttpMessageFactory;
-import io.cloudevents.jackson.JsonFormat;
-import io.netty.handler.codec.http.DefaultHttpHeaders;
-import io.netty.handler.codec.http.HttpHeaders;
import
org.apache.rocketmq.eventbridge.adapter.api.converter.EventConverterAdapter;
+import
org.apache.rocketmq.eventbridge.adapter.api.converter.HttpEventConverter;
import org.apache.rocketmq.eventbridge.adapter.api.dto.data.PutEventsResponse;
import org.apache.rocketmq.eventbridge.adapter.api.handler.EventDataHandler;
import org.apache.rocketmq.eventbridge.domain.rpc.AccountAPI;
@@ -41,9 +32,14 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
+import java.util.List;
+import java.util.Map;
+
@RestController
@RequestMapping("/")
public class EventDataController {
@@ -57,6 +53,9 @@ public class EventDataController {
@Autowired
EventConverterAdapter eventConverterAdapter;
+ @Autowired
+ HttpEventConverter httpEventConverter;
+
@PostMapping(value = {"putEvents"})
public Mono<PutEventsResponse> putEvents(@RequestHeader Map<String,
String> headers, @RequestBody byte[] body) {
List<CloudEvent> cloudEvents =
eventConverterAdapter.toEventsRequest(headers, body);
@@ -64,6 +63,18 @@ public class EventDataController {
return
eventDataHandler.putEvents(accountAPI.getResourceOwnerAccountId(), eventList);
}
+ @RequestMapping(value = {"webhook/putEvents"})
+ public Mono<PutEventsResponse> putHttpEvents(ServerWebExchange
serverWebExchange,
+ @RequestHeader Map<String,
String> headers,
+ @RequestBody byte[] body,
+ @RequestParam("token") String
token) {
+ ServerHttpRequest request = serverWebExchange.getRequest();
+ List<CloudEvent> cloudEvents =
httpEventConverter.toEventBridgeEvent(request, body,
+ headers, accountAPI.getResourceOwnerAccountId(), token);
+ List<EventBridgeEvent> eventList =
this.converterEventBridgeEvent(cloudEvents);
+ return
eventDataHandler.putEvents(accountAPI.getResourceOwnerAccountId(), eventList);
+ }
+
private List<EventBridgeEvent> converterEventBridgeEvent(List<CloudEvent>
cloudEvents) {
List<EventBridgeEvent> eventList =
Lists.newArrayListWithCapacity(cloudEvents.size());
if (CollectionUtils.isEmpty(cloudEvents)) {
diff --git
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventSourceController.java
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventSourceController.java
index f0ab381..7ad1e0a 100644
---
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventSourceController.java
+++
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/controller/EventSourceController.java
@@ -72,7 +72,7 @@ public class EventSourceController {
eventSourceService.updateEventSource(accountAPI.getResourceOwnerAccountId(),
updateEventSourceRequest.getEventBusName(),
updateEventSourceRequest.getEventSourceName(),
updateEventSourceRequest.getDescription(),
updateEventSourceRequest.getClassName(),
- updateEventSourceRequest.getConfig());
+ updateEventSourceRequest.getStatus(),
updateEventSourceRequest.getConfig());
return new UpdateEventSourceResponse();
}
diff --git
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/HttpEventConverter.java
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/HttpEventConverter.java
new file mode 100644
index 0000000..6e547ee
--- /dev/null
+++
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/converter/HttpEventConverter.java
@@ -0,0 +1,271 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.api.converter;
+
+ import com.google.common.net.MediaType;
+ import com.google.common.reflect.TypeToken;
+ import com.google.gson.Gson;
+ import io.cloudevents.CloudEvent;
+ import io.cloudevents.core.v1.CloudEventBuilder;
+ import org.apache.commons.lang3.StringUtils;
+ import org.apache.commons.net.util.SubnetUtils;
+ import org.apache.rocketmq.eventbridge.adapter.api.dto.data.HttpEventData;
+ import org.apache.rocketmq.eventbridge.config.AppConfig;
+ import org.apache.rocketmq.eventbridge.domain.model.source.EventSource;
+ import
org.apache.rocketmq.eventbridge.domain.model.source.HTTPEventSourceService;
+ import org.apache.rocketmq.eventbridge.domain.rpc.HttpEventAPI;
+ import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+ import org.apache.rocketmq.eventbridge.tools.NetUtil;
+ import org.apache.rocketmq.eventbridge.tools.transform.Data;
+ import org.apache.rocketmq.eventbridge.tools.transform.StringData;
+ import org.apache.rocketmq.eventbridge.tools.transform.Transform;
+ import org.apache.rocketmq.eventbridge.tools.transform.TransformBuilder;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.springframework.beans.factory.annotation.Autowired;
+ import org.springframework.http.HttpHeaders;
+ import org.springframework.http.HttpMethod;
+ import org.springframework.http.server.reactive.ServerHttpRequest;
+ import org.springframework.stereotype.Service;
+ import org.springframework.util.CollectionUtils;
+
+ import java.lang.reflect.Type;
+ import java.net.URI;
+ import java.nio.charset.StandardCharsets;
+ import java.time.OffsetDateTime;
+ import java.time.ZonedDateTime;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+
+ import static
org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode.JSON_ATTRIBUTE_INVALID;
+ import static
org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode.PutEventsRequestSecurityCheckFailed;
+ import static
org.apache.rocketmq.eventbridge.domain.model.source.HTTPEventSourceService.SECURITY_CONFIG_IP;
+ import static
org.apache.rocketmq.eventbridge.domain.model.source.HTTPEventSourceService.SECURITY_CONFIG_NONE;
+ import static
org.apache.rocketmq.eventbridge.domain.model.source.HTTPEventSourceService.SECURITY_CONFIG_REFERER;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/25 11:28 上午
+ */
+@Service
+public class HttpEventConverter {
+ private static final Logger logger =
LoggerFactory.getLogger(HttpEventConverter.class);
+
+ @Autowired
+ HttpEventAPI httpEventAPI;
+ @Autowired
+ HTTPEventSourceService httpEventSourceService;
+
+ private static final String HEADER_X_REAL_IP = "x-real-ip";
+ private static final String TYPE = "eventbridge:Events:HTTPEvent";
+ private static final String DATA_CONTENT_TYPE = "application/json";
+
+ private static final String SECURITY_CONFIG = "SecurityConfig";
+ private static final String IP_CONFIG = "Ip";
+ private static final String METHOD_CONFIG = "Method";
+ private static final String REFERER_CONFIG = "Referer";
+
+ private static final Set<String> DISCARD_FIELDS = new HashSet<>();
+
+
+ static {
+ DISCARD_FIELDS.add("authorization");
+ DISCARD_FIELDS.add("cookie");
+ DISCARD_FIELDS.add("proxy-authorization");
+ }
+
+ public List<CloudEvent> toEventBridgeEvent(ServerHttpRequest request,
byte[] body,
+ Map<String, String> headers,
String accountId, String token) {
+ this.checkConfig(request, headers, accountId, token);
+
+ CloudEvent cloudEvent = parseRequest(request, body, headers,
accountId, token, null, null);
+
+ return Collections.singletonList(cloudEvent);
+ }
+
+ private CloudEvent parseRequest(ServerHttpRequest request, byte[] body,
+ Map<String, String> headers, String
accountId, String token, String extractJson,
+ String template) {
+ EventSource eventSource =
httpEventSourceService.getEventSourceByToken(accountId, token);
+ HttpEventData httpEventData = getHttpEventData(request, body, headers,
accountId, token);
+ Map<String, Object> schema = parseSchema(httpEventData, extractJson,
template);
+ CloudEventBuilder builder = new CloudEventBuilder();
+ String regionId = AppConfig.getLocalConfig().getRegion();
+ CloudEventBuilder builderWithAttributes = addAttributes(regionId,
accountId, eventSource.getName(), eventSource.getEventBusName(), schema,
builder);
+ CloudEventBuilder builderWithExtensions = addExtensions(request,
regionId, accountId, headers, eventSource, builderWithAttributes);
+ HttpEventData data = (HttpEventData) schema.get("data");
+ CloudEventBuilder builderWithData = builderWithExtensions.withData(new
Gson().toJson(data).getBytes(StandardCharsets.UTF_8));
+ return builderWithData.build();
+ }
+
+ private CloudEventBuilder addAttributes(String regionId, String accountId,
String sourceName, String busName, Map<String, Object> schema,
CloudEventBuilder cloudEventBuilder) {
+ CloudEventBuilder newBuilder = cloudEventBuilder.newBuilder();
+ newBuilder.withId(UUID.randomUUID().toString())
+ .withSource(URI.create(sourceName))
+ .withDataContentType(DATA_CONTENT_TYPE)
+ .withDataSchema(null);
+
+ String subject = (String) schema.get("subject");
+ String time = (String) schema.get("time");
+ String type = (String) schema.get("type");
+
+ if (StringUtils.isNotBlank(subject)) {
+ newBuilder.withSubject(subject);
+ } else {
+ newBuilder.withSubject(httpEventAPI.generateSubject(regionId,
accountId, busName, sourceName));
+ }
+
+ if (StringUtils.isNotBlank(time)) {
+
newBuilder.withTime(OffsetDateTime.from(ZonedDateTime.parse(time)));
+ } else {
+ newBuilder.withTime(OffsetDateTime.from(ZonedDateTime.now()));
+
+ }
+ if (StringUtils.isNotBlank(type)) {
+ newBuilder.withType(type);
+ } else {
+ newBuilder.withType(TYPE);
+ }
+ return newBuilder;
+ }
+
+ private CloudEventBuilder addExtensions(ServerHttpRequest request,
+ String regionId,
+ String accountId,
+ Map<String, String> headers,
+ EventSource eventSource,
CloudEventBuilder cloudEventBuilder) {
+ return httpEventAPI.addExtensions(request, regionId, accountId,
headers, eventSource, cloudEventBuilder);
+ }
+
+ private void checkConfig(ServerHttpRequest request, Map<String, String>
headers, String accountId, String token) {
+ HttpMethod requestMethod = request.getMethod();
+ String requestIp =
request.getRemoteAddress().getAddress().getHostAddress();
+ if (headers.containsKey(HEADER_X_REAL_IP)) {
+ requestIp = headers.get(HEADER_X_REAL_IP);
+ }
+ String requestReferer = null;
+ if (headers.containsKey(HttpHeaders.REFERER)) {
+ requestReferer = headers.get(HttpHeaders.REFERER);
+ }
+
+ EventSource eventSource =
httpEventSourceService.getEventSourceByToken(accountId, token);
+ String securityConfig = (String)
eventSource.getConfig().get(SECURITY_CONFIG);
+ List<String> methods = (List<String>)
eventSource.getConfig().get(METHOD_CONFIG);
+ List<String> ips = (List<String>)
eventSource.getConfig().get(IP_CONFIG);
+ List<String> referers = (List<String>)
eventSource.getConfig().get(REFERER_CONFIG);
+
+ // request method check
+ if (!CollectionUtils.isEmpty(methods) && !new
HashSet<>(methods).contains(requestMethod.name())) {
+ throw new
EventBridgeException(PutEventsRequestSecurityCheckFailed, "request methods",
methods, requestMethod);
+ }
+
+ if
(SECURITY_CONFIG_NONE.equals(eventSource.getConfig().get(SECURITY_CONFIG))) {
+ return;
+ }
+
+ // ip check
+ if (SECURITY_CONFIG_IP.equals(securityConfig) &&
!CollectionUtils.isEmpty(ips)) {
+ boolean matched = false;
+ for (String ip : ips) {
+ if (StringUtils.equals(ip, requestIp) ||
+ (NetUtil.isNetSegment(ip) && new
SubnetUtils(ip).getInfo().isInRange(requestIp))) {
+ matched = true;
+ break;
+ }
+ }
+ if (!matched) {
+ throw new
EventBridgeException(PutEventsRequestSecurityCheckFailed, "sourceIP", ips,
requestIp);
+ }
+ }
+
+ // referer check
+ if (SECURITY_CONFIG_REFERER.equals(securityConfig) &&
!CollectionUtils.isEmpty(referers)) {
+ if (!new HashSet<>(referers).contains(requestReferer)) {
+ throw new
EventBridgeException(PutEventsRequestSecurityCheckFailed, "secure domain",
referers, requestReferer);
+ }
+ }
+ }
+
+ private HttpEventData getHttpEventData(ServerHttpRequest request, byte[]
body, Map<String, String> headers,
+ String accountId, String token) {
+ HttpEventData httpEventData = new HttpEventData();
+ HashMap<String, String> dataHeaders = new HashMap<>();
+ headers.forEach((k, v) -> {
+ if (!DISCARD_FIELDS.contains(k)) {
+ dataHeaders.put(k, v);
+ }
+ });
+
+ Object bodyContent = new String(body);
+ HashMap<String, String> temp = new HashMap<>();
+ dataHeaders.forEach((k, v) -> {
+ temp.put(k.toLowerCase(), v);
+ });
+ try {
+ String contentType = temp.get("content-type");
+ if (StringUtils.isNotBlank(contentType)) {
+ MediaType type = MediaType.parse(contentType);
+ if (type.toString().contains("application/json")) {
+ bodyContent = new Gson().fromJson((String) bodyContent,
new TypeToken<Map<String, ?>>() {}.getType());
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("GenerateEBHttpEventData failed. Http content is not a
valid json format. accountId={}, token={}, content-type={}",
+ accountId, token, temp.get("content-type"), e);
+ throw new EventBridgeException(JSON_ATTRIBUTE_INVALID);
+ }
+ httpEventData.setBody(bodyContent);
+ httpEventData.setHeaders(dataHeaders);
+ httpEventData.setHttpMethod(request.getMethod().toString());
+ httpEventData.setPath(request.getPath().toString());
+
+ HashMap<String, String> queryParam = new HashMap<>();
+ request.getQueryParams().forEach((k, v) -> {
+ if (!StringUtils.equals("token", k)) {
+ queryParam.put(k, v.get(0));
+ }
+ });
+ httpEventData.setQueryString(queryParam);
+ return httpEventData;
+ }
+
+ private Map<String, Object> parseSchema(HttpEventData httpEventData,
String extractJson, String template) {
+ // If schema is not defined, default logic is executed
+ if (StringUtils.isBlank(template)) {
+ HashMap<String, Object> result = new HashMap<>();
+ result.put("data", httpEventData);
+ return result;
+ }
+ Transform transform =
TransformBuilder.buildTemplateTransForm(extractJson, template);
+ StringData stringData = new StringData(new
Gson().toJson(httpEventData));
+ Data output = transform.process(stringData);
+ Type mapType = new TypeToken<Map<String, Object>>() {}.getType();
+ Map<String, Object> objectMap = new Gson().fromJson(output.toString(),
mapType);
+ Map<String, Object> templateData = (Map<String, Object>)
objectMap.get("data");
+ // If data is not defined in the template, the default logic is
executed
+ if (templateData == null) {
+ objectMap.put("data", httpEventData);
+ }
+ return objectMap;
+ }
+}
diff --git
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/data/HttpEventData.java
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/data/HttpEventData.java
new file mode 100644
index 0000000..f66bb52
--- /dev/null
+++
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/data/HttpEventData.java
@@ -0,0 +1,46 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.api.dto.data;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+
+import java.util.Map;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/25 11:27 上午
+ */
+@Data
+public class HttpEventData {
+ @JsonProperty("Body")
+ private Object body;
+
+ @JsonProperty("Headers")
+ private Map<String, String> headers;
+
+ @JsonProperty("HttpMethod")
+ private String httpMethod;
+
+ @JsonProperty("Path")
+ private String path;
+
+ @JsonProperty("QueryString")
+ private Map<String, String> queryString;
+
+}
diff --git
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/source/UpdateEventSourceRequest.java
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/source/UpdateEventSourceRequest.java
index 2c06cdd..6e8a18d 100644
---
a/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/source/UpdateEventSourceRequest.java
+++
b/adapter/api/src/main/java/org/apache/rocketmq/eventbridge/adapter/api/dto/source/UpdateEventSourceRequest.java
@@ -37,6 +37,9 @@ class UpdateEventSourceRequest extends BaseRequest {
@SerializedName("ClassName")
private String className;
+ @SerializedName("Status")
+ private Integer status;
+
@SerializedName("Config")
private Map<String, Object> config;
}
diff --git
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/source/mybatis/converter/EventSourceConverter.java
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/source/mybatis/converter/EventSourceConverter.java
index ab5c952..e6be7a7 100644
---
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/source/mybatis/converter/EventSourceConverter.java
+++
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/source/mybatis/converter/EventSourceConverter.java
@@ -18,9 +18,13 @@
package
org.apache.rocketmq.eventbridge.adapter.persistence.source.mybatis.converter;
import java.util.List;
+import java.util.Map;
import com.google.common.collect.Lists;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
import
org.apache.rocketmq.eventbridge.adapter.persistence.source.mybatis.dataobject.EventSourceDO;
+import
org.apache.rocketmq.eventbridge.domain.common.enums.EventSourceStatusEnum;
import org.apache.rocketmq.eventbridge.domain.common.enums.EventSourceTypeEnum;
import org.apache.rocketmq.eventbridge.domain.model.source.EventSource;
@@ -33,6 +37,9 @@ public class EventSourceConverter {
.name(eventSourceDO.getName())
.type(EventSourceTypeEnum.parseFromCode(eventSourceDO.getType()))
.description(eventSourceDO.getDescription())
+
.status(EventSourceStatusEnum.parseFromCode(eventSourceDO.getStatus()))
+ .config(new Gson().fromJson(eventSourceDO.getConfig(), new
TypeToken<Map<String, Object>>() {}.getType()))
+ .className(eventSourceDO.getClassName())
.gmtCreate(eventSourceDO.getGmtCreate())
.gmtModify(eventSourceDO.getGmtModify())
.build();
diff --git
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/source/mybatis/repository/MybatisEventSourceRepository.java
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/source/mybatis/repository/MybatisEventSourceRepository.java
index 42f80d2..09db39a 100644
---
a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/source/mybatis/repository/MybatisEventSourceRepository.java
+++
b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/source/mybatis/repository/MybatisEventSourceRepository.java
@@ -74,8 +74,8 @@
@Override
public boolean updateEventSource(String accountId, String eventBusName,
String eventSourceName, String description,
- Map<String, Object> config) {
- return eventSourceMapper.updateEventSource(accountId, eventBusName,
eventSourceName, description, null, null)
+ Integer status, Map<String, Object> config) {
+ return eventSourceMapper.updateEventSource(accountId, eventBusName,
eventSourceName, description, status, new Gson().toJson(config))
== 1;
}
}
\ No newline at end of file
diff --git
a/adapter/persistence/src/main/resources/mybatis/EventSourceMapper.xml
b/adapter/persistence/src/main/resources/mybatis/EventSourceMapper.xml
index 9de32fd..320c1c8 100644
--- a/adapter/persistence/src/main/resources/mybatis/EventSourceMapper.xml
+++ b/adapter/persistence/src/main/resources/mybatis/EventSourceMapper.xml
@@ -78,13 +78,13 @@
<set>
gmt_modify = now(),
<if test="description != null">
- description= #{description}
+ description= #{description},
</if>
<if test="status != null">
- status= #{status}
+ status= #{status},
</if>
<if test="config != null">
- type= #{config}
+ config= #{config},
</if>
</set>
WHERE account_id = #{accountId}
diff --git
a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/AppConfigAPIImpl.java
b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/AppConfigAPIImpl.java
index 4852e15..995da2c 100644
---
a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/AppConfigAPIImpl.java
+++
b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/AppConfigAPIImpl.java
@@ -22,6 +22,7 @@ import java.util.Set;
import com.google.common.collect.Sets;
import org.apache.rocketmq.eventbridge.config.AppConfig;
import org.apache.rocketmq.eventbridge.config.GlobalConfig;
+import org.apache.rocketmq.eventbridge.config.LocalConfig;
import org.springframework.stereotype.Component;
@Component
@@ -34,6 +35,15 @@ public class AppConfigAPIImpl {
Set<String> extensionKeys = Sets.newHashSet();
extensionKeys.add("aliyuneventbusname");
globalConfig.setEventExtensionKeys(extensionKeys);
- AppConfig.refresh(globalConfig);
+
+ LocalConfig localConfig = new LocalConfig();
+ localConfig.setRegion("cn-hangzhou");
+
localConfig.setPublicHttpWebhookSchema("http://%s.eventbridge.%s.com/webhook/putEvents?token=%s");
+
localConfig.setPublicHttpsWebhookSchema("https://%s.eventbridge.%s.com/webhook/putEvents?token=%s");
+
localConfig.setVpcHttpWebhookSchema("http://%s.eventbridge-vpc.%s.com/webhook/putEvents?token=%s");
+
localConfig.setVpcHttpsWebhookSchema("https://%s.eventbridge-vpc.%s.com/webhook/putEvents?token=%s");
+
+ AppConfig.refreshGlobalConfig(globalConfig);
+ AppConfig.refreshLocalConfig(localConfig);
}
}
diff --git
a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/HttpEventAPIImpl.java
b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/HttpEventAPIImpl.java
new file mode 100644
index 0000000..a6f8e8c
--- /dev/null
+++
b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/HttpEventAPIImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.rpc.impl;
+
+import io.cloudevents.core.v1.CloudEventBuilder;
+import org.apache.rocketmq.eventbridge.config.AppConfig;
+import org.apache.rocketmq.eventbridge.domain.model.source.EventSource;
+import org.apache.rocketmq.eventbridge.domain.rpc.HttpEventAPI;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/27 5:21 下午
+ */
+@Service
+public class HttpEventAPIImpl implements HttpEventAPI {
+
+ public static final String EVENTSOURCE_PATTERN =
"eventbridge:%s:%s:eventbus/%s/eventsource/%s";
+
+
+
+ @Override
+ public CloudEventBuilder addExtensions(ServerHttpRequest request,
+ String regionId,
+ String accountId,
+ Map<String, String> headers,
+ EventSource eventSource,
CloudEventBuilder cloudEventBuilder) {
+ CloudEventBuilder newBuilder = cloudEventBuilder.newBuilder();
+
+
newBuilder.withExtension(AppConfig.getGlobalConfig().getGetEventBusExtensionKey(),
+ eventSource.getEventBusName());
+
+ return newBuilder;
+ }
+
+ @Override
+ public String generateSubject(String region, String accountId, String
eventBusName, String eventSourceFullName) {
+ return String.format(EVENTSOURCE_PATTERN, region, accountId,
eventBusName,
+ eventSourceFullName);
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/eventbridge/config/AppConfig.java
b/common/src/main/java/org/apache/rocketmq/eventbridge/config/AppConfig.java
index f047cec..0e0d11b 100644
--- a/common/src/main/java/org/apache/rocketmq/eventbridge/config/AppConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/eventbridge/config/AppConfig.java
@@ -19,13 +19,22 @@ package org.apache.rocketmq.eventbridge.config;
public abstract class AppConfig {
protected static GlobalConfig globalConfig = new GlobalConfig();
+ protected static LocalConfig localConfig = new LocalConfig();
public static GlobalConfig getGlobalConfig() {
return globalConfig;
}
- public static void refresh(GlobalConfig globalConfig) {
+ public static LocalConfig getLocalConfig() {
+ return localConfig;
+ }
+
+ public static void refreshGlobalConfig(GlobalConfig globalConfig) {
AppConfig.globalConfig = globalConfig;
}
+ public static void refreshLocalConfig(LocalConfig localConfig) {
+ AppConfig.localConfig = localConfig;
+ }
+
}
diff --git
a/common/src/main/java/org/apache/rocketmq/eventbridge/config/GlobalConfig.java
b/common/src/main/java/org/apache/rocketmq/eventbridge/config/GlobalConfig.java
index 67efd06..477b82f 100644
---
a/common/src/main/java/org/apache/rocketmq/eventbridge/config/GlobalConfig.java
+++
b/common/src/main/java/org/apache/rocketmq/eventbridge/config/GlobalConfig.java
@@ -29,7 +29,7 @@ class GlobalConfig {
private String defaultDataPersistentClusterName;
- private int eventSizeUpLimit = (2 ^ 10) * 64;
+ private int eventSizeUpLimit = (1 << 10) * 64;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/eventbridge/config/LocalConfig.java
b/common/src/main/java/org/apache/rocketmq/eventbridge/config/LocalConfig.java
index 7f5d39a..be81874 100644
---
a/common/src/main/java/org/apache/rocketmq/eventbridge/config/LocalConfig.java
+++
b/common/src/main/java/org/apache/rocketmq/eventbridge/config/LocalConfig.java
@@ -20,5 +20,9 @@ import lombok.Data;
public @Data
class LocalConfig {
-
+ private String region;
+ private String publicHttpWebhookSchema;
+ private String publicHttpsWebhookSchema;
+ private String vpcHttpWebhookSchema;
+ private String vpcHttpsWebhookSchema;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/eventbridge/tools/NetUtil.java
b/common/src/main/java/org/apache/rocketmq/eventbridge/tools/NetUtil.java
new file mode 100644
index 0000000..41bf466
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/eventbridge/tools/NetUtil.java
@@ -0,0 +1,61 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.tools;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/25 11:21 上午
+ */
+public class NetUtil {
+ public static boolean isIpv4(String ip) {
+ ip = ip.trim();
+
+ try {
+ InetAddress address = InetAddress.getByName(ip);
+ if (address instanceof Inet4Address) {
+ return true;
+ }
+ return false;
+ } catch (UnknownHostException e) {
+ return false;
+ }
+ }
+
+ public static boolean isNetSegment(String netSegment) {
+ netSegment = netSegment.trim();
+ if (StringUtils.isNotBlank(netSegment) && netSegment.contains("/")) {
+ String[] parts = netSegment.split("/");
+ if (parts.length == 2) {
+ try {
+ String net = parts[0];
+ int maskLength = Integer.parseInt(parts[1]);
+ return isIpv4(net) && maskLength >= 0 && maskLength <= 32;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+ }
+ return false;
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/eventbridge/config/AppConfig.java
b/common/src/main/java/org/apache/rocketmq/eventbridge/tools/TokenUtil.java
similarity index 62%
copy from
common/src/main/java/org/apache/rocketmq/eventbridge/config/AppConfig.java
copy to
common/src/main/java/org/apache/rocketmq/eventbridge/tools/TokenUtil.java
index f047cec..7a27fdf 100644
--- a/common/src/main/java/org/apache/rocketmq/eventbridge/config/AppConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/eventbridge/tools/TokenUtil.java
@@ -14,18 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.eventbridge.config;
-public abstract class AppConfig {
+package org.apache.rocketmq.eventbridge.tools;
- protected static GlobalConfig globalConfig = new GlobalConfig();
+import java.util.UUID;
- public static GlobalConfig getGlobalConfig() {
- return globalConfig;
- }
-
- public static void refresh(GlobalConfig globalConfig) {
- AppConfig.globalConfig = globalConfig;
+/**
+ * @Author changfeng
+ * @Date 2022/5/9 5:15 下午
+ */
+public class TokenUtil {
+ /**
+ * Generate the token of the http source.
+ * @return
+ */
+ public static String generateHttpSourceToken() {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < 4; i++) {
+ builder.append(UUID.randomUUID().toString().replace("-", ""));
+ }
+ return builder.toString();
}
-
}
diff --git
a/common/src/test/java/org/apache/rocketmq/eventbridge/tools/pattern/PatternEvaluatorTest.java
b/common/src/test/java/org/apache/rocketmq/eventbridge/tools/pattern/PatternEvaluatorTest.java
index 0f13f9b..17f30a9 100644
---
a/common/src/test/java/org/apache/rocketmq/eventbridge/tools/pattern/PatternEvaluatorTest.java
+++
b/common/src/test/java/org/apache/rocketmq/eventbridge/tools/pattern/PatternEvaluatorTest.java
@@ -38,7 +38,7 @@ public class PatternEvaluatorTest {
GlobalConfig globalConfig = new GlobalConfig();
Set<String> eventExtensionKeys = Sets.newHashSet("aliyunregionid");
globalConfig.setEventExtensionKeys(eventExtensionKeys);
- AppConfig.refresh(globalConfig);
+ AppConfig.refreshGlobalConfig(globalConfig);
}
@Test
diff --git a/domain/pom.xml b/domain/pom.xml
index 26141e6..9df9b83 100644
--- a/domain/pom.xml
+++ b/domain/pom.xml
@@ -27,6 +27,14 @@
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context-support</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-web</artifactId>
+ </dependency>
<!-- Log -->
<dependency>
@@ -47,6 +55,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ </dependency>
<!-- Test -->
<dependency>
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/CacheManager.java
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/CacheManager.java
new file mode 100644
index 0000000..fdcb413
--- /dev/null
+++
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/CacheManager.java
@@ -0,0 +1,76 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.domain.cache;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.rocketmq.eventbridge.domain.common.enums.CacheEnum;
+import org.springframework.cache.Cache;
+import org.springframework.cache.caffeine.CaffeineCache;
+import org.springframework.cache.support.AbstractCacheManager;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/25 11:08 上午
+ */
+@Component
+public class CacheManager extends AbstractCacheManager {
+ private List<Cache> caches = new ArrayList<>();
+
+ public CacheManager() {
+ for (CacheEnum c : CacheEnum.values()) {
+ this.dynamicAddCache(new CaffeineCache(c.name(),
Caffeine.newBuilder()
+ .recordStats()
+ .expireAfterWrite(c.getTtl(), TimeUnit.SECONDS)
+ .maximumSize(c.getMaxSize())
+ .build()));
+ }
+ }
+
+ @Override
+ protected Collection<? extends Cache> loadCaches() {
+ return caches;
+ }
+
+ public Collection<Cache> getCaches() {
+ return caches;
+ }
+
+ public synchronized void dynamicAddCache(Cache cache) {
+ int existIndex = -1;
+ for (int i = 0; i < caches.size(); i++) {
+ if (cache.getName()
+ .equals(caches.get(i)
+ .getName())) {
+ existIndex = i;
+ break;
+ }
+ }
+ if (existIndex >= 0) {
+ this.caches.set(existIndex, cache);
+ } else {
+ this.caches.add(cache);
+ }
+ super.addCache(cache);
+ }
+}
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/CacheName.java
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/CacheName.java
new file mode 100644
index 0000000..d91b546
--- /dev/null
+++
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/CacheName.java
@@ -0,0 +1,26 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.domain.cache;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/25 11:14 上午
+ */
+public class CacheName {
+ public static final String EVENT_SOURCE = "event_source";
+}
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/GeneralKeyGenerator.java
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/GeneralKeyGenerator.java
new file mode 100644
index 0000000..395a6bd
--- /dev/null
+++
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/cache/GeneralKeyGenerator.java
@@ -0,0 +1,54 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.domain.cache;
+
+import org.springframework.cache.interceptor.KeyGenerator;
+import org.springframework.cache.interceptor.SimpleKey;
+import org.springframework.stereotype.Component;
+
+import java.lang.reflect.Method;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/25 11:14 上午
+ */
+@Component
+public class GeneralKeyGenerator implements KeyGenerator {
+ @Override
+ public Object generate(Object target, Method method, Object... params) {
+ return generateKey(params);
+ }
+
+ /**
+ * Generate a key based on the specified parameters.
+ */
+ public static Object generateKey(Object... params) {
+ if (params.length == 0) {
+ return SimpleKey.EMPTY;
+ } else {
+ StringBuilder keyBuilder = new StringBuilder();
+ for (int i = 0; i < params.length; i++) {
+ keyBuilder.append(params[i].toString());
+ if (i != params.length -1) {
+ keyBuilder.append("/");
+ }
+ }
+ return new SimpleKey(keyBuilder.toString());
+ }
+ }
+}
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/CacheEnum.java
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/CacheEnum.java
new file mode 100644
index 0000000..34d4f74
--- /dev/null
+++
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/CacheEnum.java
@@ -0,0 +1,43 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.domain.common.enums;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/25 11:09 上午
+ */
+public enum CacheEnum {
+ event_source(60, 500);
+
+
+ private int maxSize;
+ private int ttl;
+
+ CacheEnum(int ttl, int maxSize) {
+ this.ttl = ttl;
+ this.maxSize = maxSize;
+ }
+
+ public int getMaxSize() {
+ return maxSize;
+ }
+
+ public int getTtl() {
+ return ttl;
+ }
+}
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/EventSourceStatusEnum.java
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/EventSourceStatusEnum.java
index ab454ca..3ac6e57 100644
---
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/EventSourceStatusEnum.java
+++
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/enums/EventSourceStatusEnum.java
@@ -17,6 +17,10 @@
package org.apache.rocketmq.eventbridge.domain.common.enums;
+import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+
+import static
org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode.EventSourceStatusInvalid;
+
public enum EventSourceStatusEnum {
ACTIVATED(1),
FROZEN(0);
@@ -31,4 +35,13 @@ public enum EventSourceStatusEnum {
return code;
}
+ public static EventSourceStatusEnum parseFromCode(int code) {
+ for (EventSourceStatusEnum sourceStatusType :
EventSourceStatusEnum.values()) {
+ if (sourceStatusType.code == code) {
+ return sourceStatusType;
+ }
+ }
+ throw new EventBridgeException(EventSourceStatusInvalid, code);
+ }
+
}
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/exception/EventBridgeErrorCode.java
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/exception/EventBridgeErrorCode.java
index 19d1349..c85b36c 100644
---
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/exception/EventBridgeErrorCode.java
+++
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/common/exception/EventBridgeErrorCode.java
@@ -23,10 +23,15 @@ public enum EventBridgeErrorCode implements BaseErrorCode {
//Default
Success(200, "Success", "success"),
InternalError(500, "InternalError", "InternalError"),
+ GenerateTokenError(500, "GenerateTokenError", "Generate token failed,
{0}."),
//Put Events
PutEventsRequestMoreThanOneEventBus(409,
"PutEventsRequestMoreThanOneEventBus",
"The put events request has more than one event bus [{0}] "),
+ PutEventsRequestSecurityCheckFailed(409,
"PutEventsRequestSecurityCheckFailed",
+ "The putEvents request failed the webhook security check for {0}.
" +
+ "Event source configuration is {1}, the parameter in the
request is {2}."),
+ JSON_ATTRIBUTE_INVALID(409, "JsonAttributeInvalid", "The Json attribute is
invalid"),
//Event Bus
EventBusNotExist(409, "EventBusNotExist", "The event bus [{0}] not
existed!"),
@@ -47,8 +52,15 @@ public enum EventBridgeErrorCode implements BaseErrorCode {
"The current count of event source is [{0}], which will exceed the
limit quota [{1}]"),
EventSourceNameInvalid(409, "EventSourceNameInvalid", "The event source
name [{0}] is invalid!"),
EventSourceTypeInvalid(409, "EventSourceTypeInvalid", "The event source
type[{0}] is invalid!"),
+ EventSourceStatusInvalid(409, "EventSourceStatusInvalid", "The event
source status[{0}] is invalid!"),
EventSourceTypeOrClassInvalid(409, "EventSourceTypeOrClassInvalid",
"The event source type[{0}] or class[{1}] is invalid!"),
+ HttpSourceParametersInvalid(409, "HttpSourceParametersInvalid",
+ "The parameters of http source is invalid. {0}"),
+ HttpSourceParametersEmpty(409, "HttpSourceParametersEmpty",
+ "The parameters of http source is empty or contains empty value.
Invalid parameter name={0}"),
+ ExceedHttpSourceParametersCount(409, "ExceedHttpSourceParametersCount",
+ "Exceed http source parameters count limit. Limit count is {0},
and the value of {1} is {2}"),
//Event Target
EventTargetNotExist(409, "EventTargetNotExist",
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/ConnectEventSourceService.java
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/ConnectEventSourceService.java
index 381a98c..9469f42 100644
---
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/ConnectEventSourceService.java
+++
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/ConnectEventSourceService.java
@@ -76,9 +76,9 @@ public class ConnectEventSourceService extends
EventSourceService {
@Transactional
@Override
public boolean updateEventSource(String accountId, String eventBusName,
String eventSourceName, String description,
- String className, Map<String, Object> inputConfig) {
+ String className, Integer status, Map<String, Object> inputConfig) {
boolean isSucceed = super.updateEventSource(accountId, eventBusName,
eventSourceName, description, className,
- inputConfig);
+ status, inputConfig);
if (!Strings.isNullOrEmpty(className)) {
return isSucceed &&
eventSourceRunnerService.updateEventSourceRunner(accountId, eventBusName,
eventSourceName, className, inputConfig, null);
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/EventSourceService.java
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/EventSourceService.java
index 65f2783..a9aed47 100644
---
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/EventSourceService.java
+++
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/EventSourceService.java
@@ -16,22 +16,21 @@
*/
package org.apache.rocketmq.eventbridge.domain.model.source;
- import java.util.List;
- import java.util.Map;
-
import com.google.common.base.Strings;
import
org.apache.rocketmq.eventbridge.domain.common.enums.EventSourceStatusEnum;
import
org.apache.rocketmq.eventbridge.domain.common.enums.EventSourceTypeEnum;
import org.apache.rocketmq.eventbridge.domain.model.AbstractResourceService;
import org.apache.rocketmq.eventbridge.domain.model.PaginationResult;
import org.apache.rocketmq.eventbridge.domain.model.bus.EventBusService;
- import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
import
org.apache.rocketmq.eventbridge.domain.repository.EventSourceRepository;
import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
+ import java.util.List;
+ import java.util.Map;
+
import static
org.apache.rocketmq.eventbridge.domain.common.EventBridgeConstants.EVENT_SOURCE_COUNT_LIMIT;
import static
org.apache.rocketmq.eventbridge.domain.common.EventBridgeConstants.EVENT_SOURCE_NAME_MAX_LENGTH;
import static
org.apache.rocketmq.eventbridge.domain.common.EventBridgeConstants.EVENT_SOURCE_NAME_MIN_LENGTH;
@@ -45,8 +44,8 @@
@Service
public class EventSourceService extends AbstractResourceService {
- private final EventBusService eventBusService;
- private final EventSourceRepository eventSourceRepository;
+ protected final EventBusService eventBusService;
+ protected final EventSourceRepository eventSourceRepository;
public EventSourceService(EventBusService eventBusService,
EventSourceRepository eventSourceRepository) {
this.eventBusService = eventBusService;
@@ -87,9 +86,9 @@
@Transactional
public boolean updateEventSource(String accountId, String eventBusName,
String eventSourceName, String description,
- String className, Map<String, Object> inputConfig) {
+ String className, Integer status, Map<String, Object> inputConfig) {
this.checkExist(accountId, eventBusName, eventSourceName);
- return eventSourceRepository.updateEventSource(accountId,
eventBusName, eventSourceName, description,
+ return eventSourceRepository.updateEventSource(accountId,
eventBusName, eventSourceName, description, status,
inputConfig);
}
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/EventSourceServiceFactory.java
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/EventSourceServiceFactory.java
index 86a14c5..f944e18 100644
---
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/EventSourceServiceFactory.java
+++
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/EventSourceServiceFactory.java
@@ -49,7 +49,7 @@
.stream()
.filter(eventSourceService -> eventSourceService.match(type,
className))
.findFirst();
- if (eventSourceServiceOptional.get() == null) {
+ if (!eventSourceServiceOptional.isPresent()) {
throw new EventBridgeException(EventSourceTypeOrClassInvalid,
type, className);
}
return eventSourceServiceOptional.get();
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/HTTPEventSourceService.java
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/HTTPEventSourceService.java
index 002a365..c8c88e6 100644
---
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/HTTPEventSourceService.java
+++
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/source/HTTPEventSourceService.java
@@ -17,18 +17,61 @@
package org.apache.rocketmq.eventbridge.domain.model.source;
-import java.util.Map;
-
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.eventbridge.config.AppConfig;
+import org.apache.rocketmq.eventbridge.domain.cache.CacheManager;
+import org.apache.rocketmq.eventbridge.domain.cache.GeneralKeyGenerator;
import org.apache.rocketmq.eventbridge.domain.common.enums.EventSourceTypeEnum;
+import org.apache.rocketmq.eventbridge.domain.model.PaginationResult;
+import org.apache.rocketmq.eventbridge.domain.model.bus.EventBus;
import org.apache.rocketmq.eventbridge.domain.model.bus.EventBusService;
import org.apache.rocketmq.eventbridge.domain.repository.EventSourceRepository;
+import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
+import org.apache.rocketmq.eventbridge.tools.NetUtil;
+import org.apache.rocketmq.eventbridge.tools.TokenUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.rocketmq.eventbridge.domain.cache.CacheName.EVENT_SOURCE;
+import static
org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode.ExceedHttpSourceParametersCount;
+import static
org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode.GenerateTokenError;
+import static
org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode.HttpSourceParametersEmpty;
+import static
org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode.HttpSourceParametersInvalid;
+
@Service
public class HTTPEventSourceService extends EventSourceService {
+ private static final Logger logger =
LoggerFactory.getLogger(HTTPEventSourceService.class);
private static final String CLASS_NAME = "HttpEvent";
+ private static final Integer GET_TOKEN_TIMES = 100;
+ private static final String TOKEN_CONFIG = "Token";
+
+ private static final Set<String> SOURCE_PARAM_METHODS = new
HashSet<>(Arrays.asList("GET", "POST", "PUT", "PATCH", "DELETE", "HEAD",
"OPTIONS", "TRACE", "CONNECT"));
+ private static final Set<String> SOURCE_PARAM_TYPES = new
HashSet<>(Arrays.asList("HTTP", "HTTPS", "HTTP&HTTPS"));
+
+ public static final String SECURITY_CONFIG_NONE = "none";
+ public static final String SECURITY_CONFIG_IP = "ip";
+ public static final String SECURITY_CONFIG_REFERER = "referer";
+ private static final Set<String> SOURCE_PARAM_SECURITY_CONFIG = new
HashSet<>(Arrays.asList(SECURITY_CONFIG_NONE, SECURITY_CONFIG_IP,
SECURITY_CONFIG_REFERER));
+
+ private static final Integer SECURITY_CONFIG_LENGTH = 5;
+ private static final Integer REFERER_LENGTH_LIMIT = 256;
+ @Autowired
+ CacheManager cacheManager;
public HTTPEventSourceService(EventBusService eventBusService,
EventSourceRepository eventSourceRepository) {
super(eventBusService, eventSourceRepository);
}
@@ -45,21 +88,226 @@ public class HTTPEventSourceService extends
EventSourceService {
@Transactional
@Override
public boolean createEventSource(String accountId, String eventBusName,
String eventSourceName, String description,
- String className, Map<String, Object> inputConfig) {
+ String className, Map<String, Object>
inputConfig) {
// 校验
checkConfig(inputConfig);
// 渲染
- Map<String, Object> renderConfig = renderConfig(inputConfig);
+ Map<String, Object> renderConfig = renderConfig(accountId,
eventBusName, eventSourceName, inputConfig);
return super.createEventSource(accountId, eventBusName,
eventSourceName, description, className, renderConfig);
+ }
+
+ @Override
+ public boolean deleteEventSource(String accountId, String eventBusName,
String eventSourceName) {
+ this.evict(accountId, eventBusName, eventSourceName);
+ return super.deleteEventSource(accountId, eventBusName,
eventSourceName);
+ }
+ @Override
+ public boolean updateEventSource(String accountId, String eventBusName,
String eventSourceName, String description, String className, Integer status,
Map<String, Object> inputConfig) {
+ this.evict(accountId, eventBusName, eventSourceName);
+ // 校验
+ checkConfig(inputConfig);
+ // 渲染
+ Map<String, Object> renderConfig = renderConfig(accountId,
eventBusName, eventSourceName, inputConfig);
+ return super.updateEventSource(accountId, eventBusName,
eventSourceName, description, className, status, renderConfig);
}
private void checkConfig(Map<String, Object> inputConfig) {
- //TODO
+ HashMap<String, Object> sourceConfig = new
HashMap<>(inputConfig.size());
+ inputConfig.forEach((k ,v) -> {
+ sourceConfig.put(k.toLowerCase(), v);
+ });
+ // check type
+ checkType((String) sourceConfig.get("type"));
+
+ // check method
+ checkMethod((List<String>) sourceConfig.get("method"));
+
+ // check security config
+ String securityConfig = (String) sourceConfig.get("securityconfig");
+ checkSecurityConfig(securityConfig);
+
+ // check ip
+ checkIp(securityConfig, (List<String>) sourceConfig.get("ip"));
+
+ // check referer
+ checkReferer(securityConfig, (List<String>)
sourceConfig.get("referer"));
}
- private Map<String, Object> renderConfig(Map<String, Object> inputConfig) {
- //TODO
+ private Map<String, Object> renderConfig(String accountId, String
eventBusName, String eventSourceName, Map<String, Object> inputConfig) {
+ HashMap<String, Object> result = new HashMap<>(inputConfig);
+
+ // The ip and referer parameters from the sdk are empty when
securityConfig is none
+ result.putIfAbsent("Ip", new ArrayList<>());
+ result.putIfAbsent("Referer", new ArrayList<>());
+
+ EventSource eventSource =
eventSourceRepository.getEventSource(accountId, eventBusName, eventSourceName);
+
+ String regionId = AppConfig.getLocalConfig().getRegion();
+ String type = (String) inputConfig.get("Type");
+ String token;
+ if (eventSource != null && this.match(eventSource.getType(),
eventSource.getClassName())) {
+ token = (String) eventSource.getConfig().get(TOKEN_CONFIG);
+ } else {
+ token = generateToken(accountId);
+ }
+ result.put(TOKEN_CONFIG, token);
+ result.put("PublicWebHookUrl", generateWebHookUrl(regionId, accountId,
type, token, false));
+ result.put("VpcWebHookUrl", generateWebHookUrl(regionId, accountId,
type, token, true));
+
+ return result;
+ }
+
+ public String generateToken(String accountId) throws EventBridgeException {
+ int count = GET_TOKEN_TIMES;
+ String token = TokenUtil.generateHttpSourceToken();
+
+ Set<String> tokenSet = new HashSet<>();
+ int busCount = eventBusService.getEventBusesCount(accountId);
+ PaginationResult<List<EventBus>> paginationResult =
+ eventBusService.listEventBuses(accountId, "0", busCount);
+
+ for (EventBus eventBus: paginationResult.getData()) {
+ int sourceCount = getEventSourceCount(accountId,
eventBus.getName());
+ PaginationResult<List<EventSource>> listEventSources =
+ listEventSources(accountId, eventBus.getName(), "0",
sourceCount);
+
+ listEventSources.getData().stream()
+ .filter(eventSource -> this.match(eventSource.getType(),
eventSource.getClassName()))
+ .forEach(eventSource -> tokenSet.add((String)
eventSource.getConfig().get(TOKEN_CONFIG)));
+ }
+
+ if (count > 0 && tokenSet.contains(token)) {
+ token = TokenUtil.generateHttpSourceToken();
+ count--;
+ }
+ if (count == 0) {
+ throw new EventBridgeException(GenerateTokenError, "Get token
failed with " + GET_TOKEN_TIMES + " retry times");
+ }
+ return token;
+ }
+
+ public List<String> generateWebHookUrl(String regionId, String accountId,
String type, String token, boolean isVpc) {
+ List<String> webHookUrl = new ArrayList<>();
+ String httpWebHookSchema = isVpc ?
AppConfig.getLocalConfig().getVpcHttpWebhookSchema() :
+ AppConfig.getLocalConfig().getPublicHttpWebhookSchema();
+ String httpsWebHookSchema = isVpc ?
AppConfig.getLocalConfig().getVpcHttpsWebhookSchema() :
+ AppConfig.getLocalConfig().getPublicHttpsWebhookSchema();
+ if ("HTTP".equalsIgnoreCase(type)) {
+ webHookUrl.add(String.format(httpWebHookSchema, accountId,
regionId, token));
+ } else if ("HTTPS".equalsIgnoreCase(type)) {
+ webHookUrl.add(String.format(httpsWebHookSchema, accountId,
regionId, token));
+ } else {
+ webHookUrl.add(String.format(httpWebHookSchema, accountId,
regionId, token));
+ webHookUrl.add(String.format(httpsWebHookSchema, accountId,
regionId, token));
+ }
+ return webHookUrl;
+ }
+
+ private void checkType(String type) throws EventBridgeException {
+ if (StringUtils.isBlank(type)) {
+ throw new EventBridgeException(HttpSourceParametersEmpty, "Type");
+ }
+ if (!SOURCE_PARAM_TYPES.contains(type)) {
+ throw new EventBridgeException(HttpSourceParametersInvalid,
"Parameter name is Type and value is " + type);
+ }
+ }
+
+ private void checkMethod(List<String> methods) throws EventBridgeException
{
+ if (CollectionUtils.isEmpty(methods)) {
+ throw new EventBridgeException(HttpSourceParametersEmpty,
"Method");
+ }
+ for (String method : methods) {
+ if (StringUtils.isBlank(method)) {
+ throw new EventBridgeException(HttpSourceParametersEmpty,
"Method");
+ }
+ if (!SOURCE_PARAM_METHODS.contains(method)) {
+ throw new EventBridgeException(HttpSourceParametersInvalid,
"Parameter name is Method and value is " + method);
+ }
+ }
+ }
+
+ private void checkSecurityConfig(String securityConfig) throws
EventBridgeException {
+ if (StringUtils.isBlank(securityConfig)) {
+ throw new EventBridgeException(HttpSourceParametersEmpty,
"SecurityConfig");
+ }
+ if (!SOURCE_PARAM_SECURITY_CONFIG.contains(securityConfig)) {
+ throw new EventBridgeException(HttpSourceParametersInvalid,
"Parameter name is SecurityConfig and value is " + securityConfig);
+ }
+ }
+
+ private void checkIp(String securityConfig, List<String> subnets) throws
EventBridgeException {
+ if (!SECURITY_CONFIG_IP.equals(securityConfig) &&
!CollectionUtils.isEmpty(subnets)) {
+ throw new EventBridgeException(HttpSourceParametersInvalid,
"Parameter Ip should be empty when SecurityConfig is " + securityConfig);
+ }
+ if (SECURITY_CONFIG_IP.equals(securityConfig)) {
+ if (CollectionUtils.isEmpty(subnets)) {
+ throw new EventBridgeException(HttpSourceParametersInvalid,
"Parameter Ip should not be empty when SecurityConfig is " + securityConfig);
+ }
+ if (subnets.size() > SECURITY_CONFIG_LENGTH) {
+ throw new
EventBridgeException(ExceedHttpSourceParametersCount, SECURITY_CONFIG_LENGTH,
"Ip", subnets.size());
+ }
+ for (String subnet : subnets) {
+ if (!NetUtil.isIpv4(subnet) && !NetUtil.isNetSegment(subnet)) {
+ throw new
EventBridgeException(HttpSourceParametersInvalid, "Illegal IP or network
segment: " + subnet);
+ }
+ }
+ }
+ }
+
+ private void checkReferer(String securityConfig, List<String> referers)
throws EventBridgeException {
+ if (!SECURITY_CONFIG_REFERER.equals(securityConfig) &&
!CollectionUtils.isEmpty(referers)) {
+ throw new EventBridgeException(HttpSourceParametersInvalid,
"Parameter Referer should be empty when SecurityConfig is " + securityConfig);
+ }
+ if (SECURITY_CONFIG_REFERER.equals(securityConfig)) {
+ if (CollectionUtils.isEmpty(referers)) {
+ throw new EventBridgeException(HttpSourceParametersInvalid,
"Parameter Referer should not be empty when SecurityConfig is " +
securityConfig);
+ }
+ if (referers.size() > SECURITY_CONFIG_LENGTH) {
+ throw new
EventBridgeException(ExceedHttpSourceParametersCount, SECURITY_CONFIG_LENGTH,
"Referer", referers.size());
+ }
+ for (String referer : referers) {
+ if (StringUtils.isBlank(referer)) {
+ throw new EventBridgeException(HttpSourceParametersEmpty,
"Referer");
+ }
+ if (referer.length() > REFERER_LENGTH_LIMIT) {
+ throw new
EventBridgeException(HttpSourceParametersInvalid, "Parameter Referer too long.
referer=" + referer);
+ }
+ }
+ }
+ }
+
+
+ @Cacheable(cacheNames = EVENT_SOURCE, keyGenerator =
"generalKeyGenerator", unless = "#result == null")
+ public EventSource getEventSourceByToken(String accountId, String token) {
+ try {
+ int busCount = this.eventBusService.getEventBusesCount(accountId);
+ PaginationResult<List<EventBus>> paginationResult =
+ eventBusService.listEventBuses(accountId, "0", busCount);
+ for (EventBus eventBus: paginationResult.getData()) {
+ int sourceCount =
eventSourceRepository.getEventSourceCount(accountId, eventBus.getName());
+ List<EventSource> eventSources = eventSourceRepository
+ .listEventSources(accountId, eventBus.getName(), "0",
sourceCount);
+ for (EventSource eventSource: eventSources) {
+ String sourceToken = (String)
eventSource.getConfig().get(TOKEN_CONFIG);
+ if (StringUtils.isNotBlank(sourceToken) &&
StringUtils.equals(sourceToken, token)) {
+ return eventSource;
+ }
+ }
+ }
+ } catch (Throwable t) {
+ logger.error("EventSourceCacheService getEventSourceByToken
error.", t);
+ }
return null;
}
+
+ public void evict(String accountId, String eventBusName, String
eventSourceName) {
+ EventSource eventSource =
eventSourceRepository.getEventSource(accountId, eventBusName, eventSourceName);
+ if (eventSource != null) {
+ String token = (String) eventSource.getConfig().get(TOKEN_CONFIG);
+ if (StringUtils.isNotBlank(token)) {
+
cacheManager.getCache(EVENT_SOURCE).evict(GeneralKeyGenerator.generateKey(accountId,
token));
+ }
+ }
+ }
}
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/repository/EventSourceRepository.java
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/repository/EventSourceRepository.java
index c2c8e25..e05b24b 100644
---
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/repository/EventSourceRepository.java
+++
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/repository/EventSourceRepository.java
@@ -38,5 +38,5 @@ public interface EventSourceRepository {
List<EventSource> listEventSources(String accountId, String eventBusName,
String nextToken, int maxResults);
boolean updateEventSource(String accountId, String eventBusName, String
eventSourceName, String description,
- Map<String, Object> config);
+ Integer status, Map<String, Object> config);
}
diff --git
a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/rpc/HttpEventAPI.java
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/rpc/HttpEventAPI.java
new file mode 100644
index 0000000..cd9b825
--- /dev/null
+++
b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/rpc/HttpEventAPI.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.domain.rpc;
+
+import io.cloudevents.core.v1.CloudEventBuilder;
+import org.apache.rocketmq.eventbridge.domain.model.source.EventSource;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+
+import java.util.Map;
+
+/**
+ * @Author changfeng
+ * @Date 2022/4/27 5:20 下午
+ */
+public interface HttpEventAPI {
+ /**
+ * add extensions into the CloudEvent
+ * @param request
+ * @param regionId
+ * @param accountId
+ * @param headers
+ * @param eventSource
+ * @param cloudEventBuilder
+ * @return
+ */
+ CloudEventBuilder addExtensions(ServerHttpRequest request,
+ String regionId,
+ String accountId,
+ Map<String, String> headers,
+ EventSource eventSource, CloudEventBuilder
cloudEventBuilder);
+
+ /**
+ * generate the subject attribute of CloudEvent
+ * @param region
+ * @param accountId
+ * @param eventBusName
+ * @param eventSourceFullName
+ * @return
+ */
+ String generateSubject(String region, String accountId, String
eventBusName,
+ String eventSourceFullName);
+}
diff --git a/pom.xml b/pom.xml
index 2f92458..579a941 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,6 +30,7 @@
<cloudevents.version>2.3.0</cloudevents.version>
<apache.commons-text.version>1.9</apache.commons-text.version>
<mockito.version>2.13.0</mockito.version>
+ <caffeine.version>2.9.3</caffeine.version>
</properties>
<modules>
<module>start</module>
@@ -101,6 +102,11 @@
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context-support</artifactId>
+ <version>${spring.version}</version>
+ </dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
@@ -163,6 +169,11 @@
<artifactId>commons-text</artifactId>
<version>${apache.commons-text.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>${caffeine.version}</version>
+ </dependency>
<!-- Log -->
<dependency>