This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 996c439e4 [ISSUE #4869] Add Webhook support for HTTP Source Connector
(#4913)
996c439e4 is described below
commit 996c439e457cd57d5c1b21341e55ea44ad58999e
Author: Zaki <[email protected]>
AuthorDate: Thu Jun 27 14:31:36 2024 +0800
[ISSUE #4869] Add Webhook support for HTTP Source Connector (#4913)
* feat: add webhook support
* feat: add CommonProtocol and optimize some logic
* doc: update some javaDoc
* fix: update something
* fix: Optimize queue related operations
* fix: Optimize queue related operations
* fix: Optimize queue related operations
* fix: update something
* fix: Remove some meaningless code
* fix: update something
* perf: Updates to protocol-related code, with a focus on request handling
* perf: use CircularFifoQueue
* perf: Use OkHttp Client to completely replace Apache Client and resolve
code conflicts.
---
.../connector/http/SourceConnectorConfig.java | 28 ++-
.../eventmesh-connector-http/build.gradle | 1 -
.../http/common/SynchronizedCircularFifoQueue.java | 148 ++++++++++++++
.../http/sink/handle/RetryHttpSinkHandler.java | 6 +-
.../http/sink/handle/WebhookHttpSinkHandler.java | 82 ++------
.../http/source/connector/HttpSourceConnector.java | 94 ++++-----
.../connector/http/source/data/CommonResponse.java | 74 +++++++
.../connector/http/source/data/WebhookRequest.java | 25 ++-
.../connector/http/source/protocol/Protocol.java | 58 ++++++
.../http/source/protocol/ProtocolFactory.java | 80 ++++++++
.../http/source/protocol/WebhookConstants.java | 50 +++++
.../source/protocol/impl/CloudEventProtocol.java | 107 ++++++++++
.../http/source/protocol/impl/CommonProtocol.java | 111 ++++++++++
.../http/source/protocol/impl/GitHubProtocol.java | 227 +++++++++++++++++++++
.../src/main/resources/source-config.yml | 9 +-
.../source/connector/HttpSourceConnectorTest.java | 104 ++++++----
.../src/test/resources/source-config.yml | 10 +-
17 files changed, 1039 insertions(+), 175 deletions(-)
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
index 4ffd6a134..4f69f5504 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
@@ -17,6 +17,9 @@
package org.apache.eventmesh.common.config.connector.http;
+import java.util.HashMap;
+import java.util.Map;
+
import lombok.Data;
@Data
@@ -28,5 +31,28 @@ public class SourceConnectorConfig {
private int port;
- private int idleTimeout;
+ // timeunit: ms, default 5000ms
+ private int idleTimeout = 5000;
+
+ /**
+ * <ul>
+ * <li>The maximum size allowed for form attributes when Content-Type
is application/x-www-form-urlencoded or multipart/form-data </li>
+ * <li>Default is 1MB (1024 * 1024 bytes). </li>
+ * <li>If you receive a "size exceed allowed maximum capacity" error,
you can increase this value. </li>
+ * <li>Note: This applies only when handling form data
submissions.</li>
+ * </ul>
+ */
+ private int maxFormAttributeSize = 1024 * 1024;
+
+ // max size of the queue, default 1000
+ private int maxStorageSize = 1000;
+
+ // batch size, default 10
+ private int batchSize = 10;
+
+ // protocol, default CloudEvent
+ private String protocol = "CloudEvent";
+
+ // extra config, e.g. GitHub secret
+ private Map<String, String> extraConfig = new HashMap<>();
}
diff --git a/eventmesh-connectors/eventmesh-connector-http/build.gradle
b/eventmesh-connectors/eventmesh-connector-http/build.gradle
index a6086bb1e..b64f7903b 100644
--- a/eventmesh-connectors/eventmesh-connector-http/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-http/build.gradle
@@ -24,7 +24,6 @@ dependencies {
implementation 'io.vertx:vertx-web-client:4.5.8'
implementation 'dev.failsafe:failsafe:3.3.2'
- testImplementation "org.apache.httpcomponents:httpclient"
testImplementation 'org.mock-server:mockserver-netty:5.15.0'
testImplementation 'com.squareup.okhttp3:okhttp:4.12.0'
compileOnly 'org.projectlombok:lombok'
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
new file mode 100644
index 000000000..439a9f3d7
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.common;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+
+/**
+ * SynchronizedCircularFifoQueue is a synchronized version of
CircularFifoQueue.
+ */
+public class SynchronizedCircularFifoQueue<E> extends CircularFifoQueue<E> {
+
+ /**
+ * <p>Default constructor. capacity = 32</p>
+ */
+ public SynchronizedCircularFifoQueue() {
+ super();
+ }
+
+ public SynchronizedCircularFifoQueue(Collection<? extends E> coll) {
+ super(coll);
+ }
+
+ public SynchronizedCircularFifoQueue(int size) {
+ super(size);
+ }
+
+ @Override
+ public synchronized boolean add(E element) {
+ return super.add(element);
+ }
+
+ @Override
+ public synchronized void clear() {
+ super.clear();
+ }
+
+ @Override
+ public synchronized E element() {
+ return super.element();
+ }
+
+ @Override
+ public synchronized E get(int index) {
+ return super.get(index);
+ }
+
+ @Override
+ public synchronized boolean isAtFullCapacity() {
+ return super.isAtFullCapacity();
+ }
+
+ @Override
+ public synchronized boolean isEmpty() {
+ return super.isEmpty();
+ }
+
+ @Override
+ public synchronized boolean isFull() {
+ return super.isFull();
+ }
+
+ @Override
+ public synchronized int maxSize() {
+ return super.maxSize();
+ }
+
+ @Override
+ public synchronized boolean offer(E element) {
+ return super.offer(element);
+ }
+
+ @Override
+ public synchronized E peek() {
+ return super.peek();
+ }
+
+ @Override
+ public synchronized E poll() {
+ return super.poll();
+ }
+
+ @Override
+ public synchronized E remove() {
+ return super.remove();
+ }
+
+ @Override
+ public synchronized int size() {
+ return super.size();
+ }
+
+ /**
+ * <p>Fetch a range of elements from the queue.</p>
+ *
+ * @param start start index
+ * @param end end index
+ * @param removed whether to remove the elements from the queue
+ * @return list of elements
+ */
+ public synchronized List<E> fetchRange(int start, int end, boolean
removed) {
+
+ if (start < 0 || end > this.size() || start > end) {
+ throw new IllegalArgumentException("Invalid range");
+ }
+
+ Iterator<E> iterator = this.iterator();
+ List<E> items = new ArrayList<>(end - start);
+
+ int count = 0;
+ while (iterator.hasNext() && count < end) {
+ E item = iterator.next();
+ if (item != null && count >= start) {
+ // Add the element to the list
+ items.add(item);
+ if (removed) {
+ // Remove the element from the queue
+ iterator.remove();
+ }
+ }
+ count++;
+ }
+ return items;
+
+ }
+
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
index 06700261d..bc2a53610 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
@@ -132,7 +132,7 @@ public class RetryHttpSinkHandler implements
HttpSinkHandler {
// convert the result to an HttpExportRecord
HttpExportRecord exportRecord =
covertToExportRecord(httpConnectRecord, event, event.getResult(),
event.getException(), url, id);
// add the data to the queue
- ((WebhookHttpSinkHandler)
sinkHandler).addDataToQueue(exportRecord);
+ ((WebhookHttpSinkHandler)
sinkHandler).getReceivedDataQueue().offer(exportRecord);
}
})
.onRetry(event -> {
@@ -144,7 +144,7 @@ public class RetryHttpSinkHandler implements
HttpSinkHandler {
if (connectorConfig.getWebhookConfig().isActivate()) {
HttpExportRecord exportRecord =
covertToExportRecord(httpConnectRecord, event,
event.getLastResult(), event.getLastException(), url, id);
- ((WebhookHttpSinkHandler)
sinkHandler).addDataToQueue(exportRecord);
+ ((WebhookHttpSinkHandler)
sinkHandler).getReceivedDataQueue().offer(exportRecord);
}
// update the HttpConnectRecord
httpConnectRecord.setTime(LocalDateTime.now().toString());
@@ -159,7 +159,7 @@ public class RetryHttpSinkHandler implements
HttpSinkHandler {
}
if (connectorConfig.getWebhookConfig().isActivate()) {
HttpExportRecord exportRecord =
covertToExportRecord(httpConnectRecord, event, event.getResult(),
event.getException(), url, id);
- ((WebhookHttpSinkHandler)
sinkHandler).addDataToQueue(exportRecord);
+ ((WebhookHttpSinkHandler)
sinkHandler).getReceivedDataQueue().offer(exportRecord);
}
}).build();
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
index e07683fcf..9af246bc6 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
@@ -18,6 +18,7 @@
package org.apache.eventmesh.connector.http.sink.handle;
import org.apache.eventmesh.common.exception.EventMeshException;
+import
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
@@ -30,13 +31,9 @@ import org.apache.commons.lang3.StringUtils;
import java.net.URI;
import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future;
@@ -73,25 +70,22 @@ public class WebhookHttpSinkHandler extends
CommonHttpSinkHandler {
private HttpServer exportServer;
// store the received data, when webhook is enabled
- private final ConcurrentLinkedQueue<HttpExportRecord> receivedDataQueue;
-
- // the maximum queue size
- private final int maxQueueSize;
-
- // the current queue size
- private final AtomicInteger currentQueueSize;
+ private final SynchronizedCircularFifoQueue<HttpExportRecord>
receivedDataQueue;
public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
super(sinkConnectorConfig);
this.sinkConnectorConfig = sinkConnectorConfig;
this.webhookConfig = sinkConnectorConfig.getWebhookConfig();
- this.maxQueueSize = this.webhookConfig.getMaxStorageSize();
- this.currentQueueSize = new AtomicInteger(0);
- this.receivedDataQueue = new ConcurrentLinkedQueue<>();
+ int maxQueueSize = this.webhookConfig.getMaxStorageSize();
+ this.receivedDataQueue = new
SynchronizedCircularFifoQueue<>(maxQueueSize);
// init the export server
doInitExportServer();
}
+ public SynchronizedCircularFifoQueue<HttpExportRecord>
getReceivedDataQueue() {
+ return receivedDataQueue;
+ }
+
/**
* Initialize the server for exporting the received data
*/
@@ -135,7 +129,7 @@ public class WebhookHttpSinkHandler extends
CommonHttpSinkHandler {
int pageNum = StringUtils.isBlank(pageNumStr) ? 1 :
Integer.parseInt(pageNumStr);
int pageSize = Integer.parseInt(pageSizeStr);
- if (currentQueueSize.get() == 0) {
+ if (receivedDataQueue.isEmpty()) {
ctx.response()
.putHeader(HttpHeaders.CONTENT_TYPE,
"application/json; charset=utf-8")
.setStatusCode(HttpResponseStatus.NO_CONTENT.code())
@@ -148,12 +142,12 @@ public class WebhookHttpSinkHandler extends
CommonHttpSinkHandler {
List<HttpExportRecord> exportRecords;
if (Objects.equals(type, TypeEnum.POLL.getValue())) {
// If the type is poll, only the first page of data is
exported and removed
- exportRecords = getDataFromQueue(0, pageSize, true);
+ exportRecords = receivedDataQueue.fetchRange(0, pageSize,
true);
} else {
// If the type is peek, the specified page of data is
exported without removing
int startIndex = (pageNum - 1) * pageSize;
int endIndex = startIndex + pageSize;
- exportRecords = getDataFromQueue(startIndex, endIndex,
false);
+ exportRecords = receivedDataQueue.fetchRange(startIndex,
endIndex, false);
}
// Create HttpExportRecordPage
@@ -242,63 +236,11 @@ public class WebhookHttpSinkHandler extends
CommonHttpSinkHandler {
// create ExportRecord
HttpExportRecord exportRecord = new
HttpExportRecord(httpExportMetadata, arr.succeeded() ?
arr.result().bodyAsString() : null);
// add the data to the queue
- addDataToQueue(exportRecord);
+ receivedDataQueue.offer(exportRecord);
});
}
- /**
- * Adds the received data to the queue.
- *
- * @param exportRecord the received data to add to the queue
- */
- public void addDataToQueue(HttpExportRecord exportRecord) {
- // If the current queue size is greater than or equal to the maximum
queue size, remove the oldest element
- if (currentQueueSize.get() >= maxQueueSize) {
- Object removedData = receivedDataQueue.poll();
- if (log.isDebugEnabled()) {
- log.debug("The queue is full, remove the oldest element: {}",
removedData);
- } else {
- log.info("The queue is full, remove the oldest element");
- }
- currentQueueSize.decrementAndGet();
- }
- // Try to put the received data into the queue
- if (receivedDataQueue.offer(exportRecord)) {
- currentQueueSize.incrementAndGet();
- log.debug("Successfully put the received data into the queue: {}",
exportRecord);
- } else {
- log.error("Failed to put the received data into the queue: {}",
exportRecord);
- }
- }
-
- /**
- * Gets the received data from the queue.
- *
- * @param startIndex the start index of the data to get
- * @param endIndex the end index of the data to get
- * @param removed whether to remove the data from the queue
- * @return the received data
- */
- private List<HttpExportRecord> getDataFromQueue(int startIndex, int
endIndex, boolean removed) {
- Iterator<HttpExportRecord> iterator = receivedDataQueue.iterator();
-
- List<HttpExportRecord> pageItems = new ArrayList<>(endIndex -
startIndex);
- int count = 0;
- while (iterator.hasNext() && count < endIndex) {
- HttpExportRecord item = iterator.next();
- if (count >= startIndex) {
- pageItems.add(item);
- if (removed) {
- iterator.remove();
- currentQueueSize.decrementAndGet();
- }
- }
- count++;
- }
- return pageItems;
- }
-
/**
* Cleans up and releases resources used by the HTTP/HTTPS handler.
*/
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
index 5f63e9aa3..b976fed9b 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
@@ -20,25 +20,23 @@ package
org.apache.eventmesh.connector.http.source.connector;
import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.config.connector.http.HttpSourceConfig;
import org.apache.eventmesh.common.exception.EventMeshException;
+import
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
+import org.apache.eventmesh.connector.http.source.protocol.Protocol;
+import org.apache.eventmesh.connector.http.source.protocol.ProtocolFactory;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
-import org.apache.eventmesh.openconnect.util.CloudEventUtil;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import io.cloudevents.CloudEvent;
-import io.cloudevents.http.vertx.VertxMessageFactory;
-import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Vertx;
-import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
+import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.LoggerHandler;
@@ -47,12 +45,17 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HttpSourceConnector implements Source {
- private static final int DEFAULT_BATCH_SIZE = 10;
-
private HttpSourceConfig sourceConfig;
- private BlockingQueue<CloudEvent> queue;
+
+ private SynchronizedCircularFifoQueue<Object> queue;
+
+ private int batchSize;
+
+ private Protocol protocol;
+
private HttpServer server;
+
@Override
public Class<? extends Config> configClass() {
return HttpSourceConfig.class;
@@ -72,42 +75,32 @@ public class HttpSourceConnector implements Source {
}
private void doInit() {
- this.queue = new LinkedBlockingQueue<>(1000);
+ // init queue
+ int maxQueueSize =
this.sourceConfig.getConnectorConfig().getMaxStorageSize();
+ this.queue = new SynchronizedCircularFifoQueue<>(maxQueueSize);
+
+ // init batch size
+ this.batchSize = this.sourceConfig.getConnectorConfig().getBatchSize();
+
+ // init protocol
+ String protocolName =
this.sourceConfig.getConnectorConfig().getProtocol();
+ this.protocol =
ProtocolFactory.getInstance(this.sourceConfig.connectorConfig, protocolName);
final Vertx vertx = Vertx.vertx();
final Router router = Router.router(vertx);
- router.route()
+ final Route route = router.route()
.path(this.sourceConfig.connectorConfig.getPath())
- .method(HttpMethod.POST)
- .handler(LoggerHandler.create())
- .handler(ctx -> {
- VertxMessageFactory.createReader(ctx.request())
- .map(reader -> {
- CloudEvent event = reader.toEvent();
- if (event.getSubject() == null) {
- throw new IllegalStateException("attribute
'subject' cannot be null");
- }
- if (event.getDataContentType() == null) {
- throw new IllegalStateException("attribute
'datacontenttype' cannot be null");
- }
- if (event.getData() == null) {
- throw new IllegalStateException("attribute 'data'
cannot be null");
- }
- return event;
- })
- .onSuccess(event -> {
- queue.add(event);
- log.info("[HttpSourceConnector] Succeed to convert
payload into CloudEvent. StatusCode={}", HttpResponseStatus.OK.code());
-
ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end();
- })
- .onFailure(t -> {
- log.error("[HttpSourceConnector] Malformed request.
StatusCode={}", HttpResponseStatus.BAD_REQUEST.code(), t);
-
ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end();
- });
- });
+ .handler(LoggerHandler.create());
+
+ // set protocol handler
+ this.protocol.setHandler(route, queue);
+
+ // create server
this.server = vertx.createHttpServer(new HttpServerOptions()
.setPort(this.sourceConfig.connectorConfig.getPort())
-
.setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())).requestHandler(router);
+
.setMaxFormAttributeSize(this.sourceConfig.connectorConfig.getMaxFormAttributeSize())
+ .setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())
+ .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)).requestHandler(router);
}
@Override
@@ -138,17 +131,20 @@ public class HttpSourceConnector implements Source {
@Override
public List<ConnectRecord> poll() {
- List<ConnectRecord> connectRecords = new
ArrayList<>(DEFAULT_BATCH_SIZE);
- for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
- try {
- CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
- if (event == null) {
- break;
- }
- connectRecords.add(CloudEventUtil.convertEventToRecord(event));
- } catch (InterruptedException e) {
+ // if queue is empty, return empty list
+ if (queue.isEmpty()) {
+ return Collections.emptyList();
+ }
+ // poll from queue
+ List<ConnectRecord> connectRecords = new ArrayList<>(batchSize);
+ for (int i = 0; i < batchSize; i++) {
+ Object obj = queue.poll();
+ if (obj == null) {
break;
}
+ // convert to ConnectRecord
+ ConnectRecord connectRecord = protocol.convertToConnectRecord(obj);
+ connectRecords.add(connectRecord);
}
return connectRecords;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/CommonResponse.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/CommonResponse.java
new file mode 100644
index 000000000..870f2afbe
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/CommonResponse.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.data;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter.Feature;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Webhook response.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class CommonResponse implements Serializable {
+
+ private static final long serialVersionUID = 8616938575207104455L;
+
+ private String msg;
+
+ private LocalDateTime handleTime;
+
+ /**
+ * Convert to json string.
+ *
+ * @return json string
+ */
+ public String toJsonStr() {
+ return JSON.toJSONString(this, Feature.WriteMapNullValue);
+ }
+
+
+ /**
+ * Create a success response.
+ *
+ * @return response
+ */
+ public static CommonResponse success() {
+ return base("success");
+ }
+
+
+ /**
+ * Create a base response.
+ *
+ * @param msg message
+ * @return response
+ */
+ public static CommonResponse base(String msg) {
+ return new CommonResponse(msg, LocalDateTime.now());
+ }
+
+}
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java
similarity index 62%
copy from
eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
copy to
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java
index 4ffd6a134..2fe7399da 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java
@@ -15,18 +15,31 @@
* limitations under the License.
*/
-package org.apache.eventmesh.common.config.connector.http;
+package org.apache.eventmesh.connector.http.source.data;
+import java.io.Serializable;
+import java.util.Map;
+
+import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.NoArgsConstructor;
+/**
+ * Webhook Protocol Request.
+ */
@Data
-public class SourceConnectorConfig {
+@NoArgsConstructor
+@AllArgsConstructor
+public class WebhookRequest implements Serializable {
+
+ private static final long serialVersionUID = -483500600756490500L;
+
+ private String protocolName;
- private String connectorName;
+ private String url;
- private String path;
+ private Map<String, String> headers;
- private int port;
+ private Object payload;
- private int idleTimeout;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java
new file mode 100644
index 000000000..b671383e5
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.protocol;
+
+import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
+import
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import io.vertx.ext.web.Route;
+
+
+/**
+ * Protocol Interface.
+ * All protocols should implement this interface.
+ */
+public interface Protocol {
+
+
+ /**
+ * Initialize the protocol.
+ *
+ * @param sourceConnectorConfig source connector config
+ */
+ void initialize(SourceConnectorConfig sourceConnectorConfig);
+
+
+ /**
+ * Handle the protocol message.
+ *
+ * @param route route
+ * @param queue queue info
+ */
+ void setHandler(Route route, SynchronizedCircularFifoQueue<Object> queue);
+
+
+ /**
+ * Convert the message to ConnectRecord.
+ *
+ * @param message message
+ * @return ConnectRecord
+ */
+ ConnectRecord convertToConnectRecord(Object message);
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/ProtocolFactory.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/ProtocolFactory.java
new file mode 100644
index 000000000..6e6100e88
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/ProtocolFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.protocol;
+
+import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
+import
org.apache.eventmesh.connector.http.source.protocol.impl.CloudEventProtocol;
+import org.apache.eventmesh.connector.http.source.protocol.impl.CommonProtocol;
+import org.apache.eventmesh.connector.http.source.protocol.impl.GitHubProtocol;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Protocol factory. This class is responsible for storing and creating
instances of {@link Protocol} classes.
+ */
+public class ProtocolFactory {
+
+ // protocol name -> protocol class
+ private static final ConcurrentHashMap<String, Class<?>> protocols = new
ConcurrentHashMap<>();
+
+ static {
+ // register all protocols
+ registerProtocol(CloudEventProtocol.PROTOCOL_NAME,
CloudEventProtocol.class);
+ registerProtocol(GitHubProtocol.PROTOCOL_NAME, GitHubProtocol.class);
+ registerProtocol(CommonProtocol.PROTOCOL_NAME, CommonProtocol.class);
+ }
+
+
+ /**
+ * Register a protocol
+ *
+ * @param name name of the protocol
+ * @param clazz class of the protocol
+ */
+ public static void registerProtocol(String name, Class<?> clazz) {
+ if (Protocol.class.isAssignableFrom(clazz)) {
+ // put the class into the map(case insensitive)
+ protocols.put(name.toLowerCase(), clazz);
+ } else {
+ throw new IllegalArgumentException("Class " + clazz.getName() + "
does not implement Protocol interface");
+ }
+ }
+
+ /**
+ * Get an instance of a protocol, if it is not already created, create a
new instance
+ *
+ * @param name name of the protocol
+ * @return instance of the protocol
+ */
+ public static Protocol getInstance(SourceConnectorConfig
sourceConnectorConfig, String name) {
+ // get the class by name(case insensitive)
+ Class<?> clazz = Optional.ofNullable(protocols.get(name.toLowerCase()))
+ .orElseThrow(() -> new IllegalArgumentException("Protocol " + name
+ " is not registered"));
+ try {
+ // create a new instance
+ Protocol protocol = (Protocol) clazz.newInstance();
+ // initialize the protocol
+ protocol.initialize(sourceConnectorConfig);
+ return protocol;
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IllegalArgumentException("Failed to instantiate protocol
" + name, e);
+ }
+ }
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/WebhookConstants.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/WebhookConstants.java
new file mode 100644
index 000000000..b31637427
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/WebhookConstants.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.protocol;
+
+public class WebhookConstants {
+
+ /**
+ * -------------------------------------- About GitHub
--------------------------------------
+ */
+
+ // A globally unique identifier (GUID) to identify the delivery.
+ public static final String GITHUB_DELIVERY = "X-GitHub-Delivery";
+
+ // This header is sent if the webhook is configured with a secret.
+ // We recommend that you use the more secure X-Hub-Signature-256 instead
+ public static final String GITHUB_SIGNATURE = "X-Hub-Signature";
+
+ // This header is sent if the webhook is configured with a secret
+ public static final String GITHUB_SIGNATURE_256 = "X-Hub-Signature-256";
+
+ public static final String GITHUB_HASH_265_PREFIX = "sha256=";
+
+ // The name of the event that triggered the delivery.
+ public static final String GITHUB_EVENT = "X-GitHub-Event";
+
+ // The unique identifier of the webhook.
+ public static final String GITHUB_HOOK_ID = "X-GitHub-Hook-ID";
+
+ // The unique identifier of the resource where the webhook was created.
+ public static final String GITHUB_HOOK_INSTALLATION_TARGET_ID =
"X-GitHub-Hook-Installation-Target-ID";
+
+ // The type of resource where the webhook was created.
+ public static final String GITHUB_HOOK_INSTALLATION_TARGET_TYPE =
"X-GitHub-Hook-Installation-Target-Type";
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
new file mode 100644
index 000000000..4906e920f
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.protocol.impl;
+
+import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
+import
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
+import org.apache.eventmesh.connector.http.source.data.CommonResponse;
+import org.apache.eventmesh.connector.http.source.protocol.Protocol;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.util.CloudEventUtil;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.http.vertx.VertxMessageFactory;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.ext.web.Route;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * CloudEvent Protocol.
+ */
+@Slf4j
+public class CloudEventProtocol implements Protocol {
+
+ // Protocol name
+ public static final String PROTOCOL_NAME = "CloudEvent";
+
+
+ /**
+ * Initialize the protocol.
+ *
+ * @param sourceConnectorConfig source connector config
+ */
+ @Override
+ public void initialize(SourceConnectorConfig sourceConnectorConfig) {
+
+ }
+
+
+ /**
+ * Handle the protocol message for CloudEvent.
+ *
+ * @param route route
+ * @param queue queue info
+ */
+ @Override
+ public void setHandler(Route route, SynchronizedCircularFifoQueue<Object>
queue) {
+ route.method(HttpMethod.POST)
+ .handler(ctx -> VertxMessageFactory.createReader(ctx.request())
+ .map(reader -> {
+ CloudEvent event = reader.toEvent();
+ if (event.getSubject() == null) {
+ throw new IllegalStateException("attribute 'subject'
cannot be null");
+ }
+ if (event.getDataContentType() == null) {
+ throw new IllegalStateException("attribute
'datacontenttype' cannot be null");
+ }
+ if (event.getData() == null) {
+ throw new IllegalStateException("attribute 'data'
cannot be null");
+ }
+ return event;
+ })
+ .onSuccess(event -> {
+ // Add the event to the queue, thread-safe
+ if (!queue.offer(event)) {
+ throw new IllegalStateException("Failed to store the
request.");
+ }
+ log.info("[HttpSourceConnector] Succeed to convert payload
into CloudEvent. StatusCode={}", HttpResponseStatus.OK.code());
+ ctx.response()
+ .setStatusCode(HttpResponseStatus.OK.code())
+ .end(CommonResponse.success().toJsonStr());
+ })
+ .onFailure(t -> {
+ log.error("[HttpSourceConnector] Malformed request.
StatusCode={}", HttpResponseStatus.BAD_REQUEST.code(), t);
+ ctx.response()
+ .setStatusCode(HttpResponseStatus.BAD_REQUEST.code())
+ .end(CommonResponse.base(t.getMessage()).toJsonStr());
+ }));
+ }
+
+ /**
+ * Convert the message to ConnectRecord.
+ *
+ * @param message message
+ * @return ConnectRecord
+ */
+ @Override
+ public ConnectRecord convertToConnectRecord(Object message) {
+ return CloudEventUtil.convertEventToRecord((CloudEvent) message);
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
new file mode 100644
index 000000000..80e4f0a75
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.protocol.impl;
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
+import
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
+import org.apache.eventmesh.connector.http.source.data.CommonResponse;
+import org.apache.eventmesh.connector.http.source.data.WebhookRequest;
+import org.apache.eventmesh.connector.http.source.protocol.Protocol;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.handler.BodyHandler;
+
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Common Protocol. This class represents the common webhook protocol. The
processing method of this class does not perform any other operations
+ * except storing the request and returning a general response.
+ */
+@Slf4j
+public class CommonProtocol implements Protocol {
+
+ public static final String PROTOCOL_NAME = "Common";
+
+ /**
+ * Initialize the protocol
+ *
+ * @param sourceConnectorConfig source connector config
+ */
+ @Override
+ public void initialize(SourceConnectorConfig sourceConnectorConfig) {
+
+ }
+
+ /**
+ * Set the handler for the route
+ *
+ * @param route route
+ * @param queue queue info
+ */
+ @Override
+ public void setHandler(Route route, SynchronizedCircularFifoQueue<Object>
queue) {
+ route.method(HttpMethod.POST)
+ .handler(BodyHandler.create())
+ .handler(ctx -> {
+ // Get the payload
+ String payloadStr =
ctx.body().asString(Constants.DEFAULT_CHARSET.toString());
+
+ // Create and store the webhook request
+ Map<String, String> headerMap =
ctx.request().headers().entries().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ WebhookRequest webhookRequest = new
WebhookRequest(PROTOCOL_NAME, ctx.request().absoluteURI(), headerMap,
payloadStr);
+ if (!queue.offer(webhookRequest)) {
+ throw new IllegalStateException("Failed to store the
request.");
+ }
+
+ // Return 200 OK
+ ctx.response()
+ .setStatusCode(HttpResponseStatus.OK.code())
+ .end(CommonResponse.success().toJsonStr());
+ })
+ .failureHandler(ctx -> {
+ log.error("Failed to handle the request. ", ctx.failure());
+
+ // Return Bad Response
+ ctx.response()
+ .setStatusCode(ctx.statusCode())
+
.end(CommonResponse.base(ctx.failure().getMessage()).toJsonStr());
+ });
+
+ }
+
+ /**
+ * Convert the message to a connect record
+ *
+ * @param message message
+ * @return connect record
+ */
+ @Override
+ public ConnectRecord convertToConnectRecord(Object message) {
+ WebhookRequest request = (WebhookRequest) message;
+ ConnectRecord connectRecord = new ConnectRecord(null, null,
System.currentTimeMillis(), request.getPayload());
+ connectRecord.addExtension("source", request.getProtocolName());
+ connectRecord.addExtension("url", request.getUrl());
+ connectRecord.addExtension("headers", request.getHeaders());
+ return connectRecord;
+ }
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
new file mode 100644
index 000000000..e86efcbf3
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.protocol.impl;
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
+import org.apache.eventmesh.common.exception.EventMeshException;
+import
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
+import org.apache.eventmesh.connector.http.source.data.CommonResponse;
+import org.apache.eventmesh.connector.http.source.data.WebhookRequest;
+import org.apache.eventmesh.connector.http.source.protocol.Protocol;
+import org.apache.eventmesh.connector.http.source.protocol.WebhookConstants;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.MultiMap;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.handler.BodyHandler;
+
+import com.alibaba.fastjson2.JSONObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * GitHub Protocol. This class represents the GitHub webhook protocol.
+ */
+@Slf4j
+public class GitHubProtocol implements Protocol {
+
+ // Protocol name
+ public static final String PROTOCOL_NAME = "GitHub";
+
+ private static final String H_MAC_SHA_265 = "HmacSHA256";
+
+ private static final String SECRET_KEY = "secret";
+
+ private String contentType = "application/json";
+
+ private String secret;
+
+
+ /**
+ * Initialize the protocol.
+ *
+ * @param sourceConnectorConfig source connector config
+ */
+ @Override
+ public void initialize(SourceConnectorConfig sourceConnectorConfig) {
+ // Initialize the protocol
+ Map<String, String> extraConfig =
sourceConnectorConfig.getExtraConfig();
+ // set the secret, if it is not set, throw an exception
+ this.secret = extraConfig.get(SECRET_KEY);
+ if (StringUtils.isBlank(this.secret)) {
+ throw new EventMeshException("The secret is required for GitHub
protocol.");
+ }
+ // if the content-type is not set, use the default value
+ this.contentType = extraConfig.getOrDefault("contentType",
contentType);
+ }
+
+ /**
+ * Handle the protocol message for GitHub.
+ *
+ * @param route route
+ * @param queue queue info
+ */
+ @Override
+ public void setHandler(Route route, SynchronizedCircularFifoQueue<Object>
queue) {
+ route.method(HttpMethod.POST)
+ .handler(BodyHandler.create())
+ .handler(ctx -> {
+ // Get the payload and headers
+ String payloadStr =
ctx.body().asString(Constants.DEFAULT_CHARSET.toString());
+ MultiMap headers = ctx.request().headers();
+
+ // validate the content type
+ if (!StringUtils.contains(headers.get("Content-Type"),
contentType)) {
+ String errorMsg = String.format("content-type is invalid,
please check the content-type. received content-type: %s",
+ headers.get("Content-Type"));
+ // Return Bad Request
+ ctx.fail(HttpResponseStatus.BAD_REQUEST.code(), new
EventMeshException(errorMsg));
+ return;
+ }
+
+ // validate the signature
+ String signature =
headers.get(WebhookConstants.GITHUB_SIGNATURE_256);
+ if (BooleanUtils.isFalse(validateSignature(signature,
payloadStr, secret))) {
+ String errorMsg = String.format("signature is invalid,
please check the secret. received signature: %s", signature);
+ // Return Bad Request
+ ctx.fail(HttpResponseStatus.BAD_REQUEST.code(), new
EventMeshException(errorMsg));
+ return;
+ }
+
+ // if the content type is form data, convert it to json string
+ if (StringUtils.contains(contentType,
"application/x-www-form-urlencoded")
+ || StringUtils.contains(contentType,
"multipart/form-data")) {
+ /*
+ Convert form data to json string. There are the
following benefits:
+ 1. Raw form data is not decoded, so it is not easy to
process directly.
+ 2. Converted to reduce storage space by more than 20
percent. Experimental result: 10329 bytes -> 7893 bytes.
+ */
+ JSONObject payloadObj = new JSONObject();
+ ctx.request().formAttributes().forEach(entry ->
payloadObj.put(entry.getKey(), entry.getValue()));
+ payloadStr = payloadObj.toJSONString();
+ }
+
+ // Create and store the webhook request
+ Map<String, String> headerMap = headers.entries().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ WebhookRequest webhookRequest = new
WebhookRequest(PROTOCOL_NAME, ctx.request().absoluteURI(), headerMap,
payloadStr);
+
+ if (!queue.offer(webhookRequest)) {
+ throw new IllegalStateException("Failed to store the
request.");
+ }
+
+ // Return 200 OK
+ ctx.response()
+ .setStatusCode(HttpResponseStatus.OK.code())
+ .end(CommonResponse.success().toJsonStr());
+ })
+ .failureHandler(ctx -> {
+ log.error("Failed to handle the request from github. ",
ctx.failure());
+
+ // Return Bad Response
+ ctx.response()
+ .setStatusCode(ctx.statusCode())
+
.end(CommonResponse.base(ctx.failure().getMessage()).toJsonStr());
+ });
+ }
+
+
+ /**
+ * Validate the signature.
+ *
+ * @param signature signature
+ * @param payload payload
+ * @param secret secret
+ * @return boolean
+ */
+ public boolean validateSignature(String signature, String payload, String
secret) {
+ String hash = WebhookConstants.GITHUB_HASH_265_PREFIX;
+ try {
+ Mac sha = Mac.getInstance(H_MAC_SHA_265);
+ SecretKeySpec secretKey = new
SecretKeySpec(secret.getBytes(Constants.DEFAULT_CHARSET), H_MAC_SHA_265);
+ sha.init(secretKey);
+ byte[] bytes =
sha.doFinal(payload.getBytes(Constants.DEFAULT_CHARSET));
+ hash += byteArrayToHexString(bytes);
+ } catch (Exception e) {
+ throw new EventMeshException("Error occurred while validating the
signature.", e);
+ }
+
+ return hash.equals(signature);
+ }
+
+
+ /**
+ * Convert the byte array to hex string.
+ *
+ * @param bytes bytes
+ * @return String
+ */
+ private String byteArrayToHexString(byte[] bytes) {
+ if (bytes == null) {
+ return "";
+ }
+
+ StringBuilder sb = new StringBuilder();
+ for (byte b : bytes) {
+ String hex = Integer.toHexString(0xFF & b);
+ if (hex.length() == 1) {
+ // If the length is 1, append 0
+ sb.append('0');
+ }
+ sb.append(hex);
+ }
+
+ return sb.toString();
+ }
+
+
+ /**
+ * Convert the message to ConnectRecord.
+ *
+ * @param message message
+ * @return ConnectRecord
+ */
+ @Override
+ public ConnectRecord convertToConnectRecord(Object message) {
+ WebhookRequest request = (WebhookRequest) message;
+ Map<String, String> headers = request.getHeaders();
+
+ // Create the ConnectRecord
+ ConnectRecord connectRecord = new ConnectRecord(null, null,
System.currentTimeMillis(), request.getPayload());
+ connectRecord.addExtension("id",
headers.get(WebhookConstants.GITHUB_DELIVERY));
+ connectRecord.addExtension("topic",
headers.get(WebhookConstants.GITHUB_EVENT));
+ connectRecord.addExtension("source",
headers.get(request.getProtocolName()));
+ connectRecord.addExtension("type",
headers.get(WebhookConstants.GITHUB_HOOK_INSTALLATION_TARGET_TYPE));
+ return connectRecord;
+ }
+
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
index 9fcc471d3..b1edc084f 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
@@ -28,4 +28,11 @@ connectorConfig:
connectorName: httpSource
path: /test
port: 3755
- idleTimeout: 5
\ No newline at end of file
+ idleTimeout: 5000 # timeunit: ms
+ maxFormAttributeSize: 1048576 # timeunit: byte, default: 1048576(1MB).
This applies only when handling form data submissions.
+ maxStorageSize: 1000 # max storage size, default: 1000
+ batchSize: 10 # batch size, default: 10
+ protocol: CloudEvent # Case insensitive, default: CloudEvent, options:
CloudEvent, GitHub, Common
+ extraConfig: # extra config for different protocol, e.g. GitHub secret
+ secret: xxxxxxx # GitHub secret
+ contentType: application/json # GitHub content type
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
index 90bad0832..b764b4a98 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
@@ -17,22 +17,16 @@
package org.apache.eventmesh.connector.http.source.connector;
+
import org.apache.eventmesh.common.config.connector.http.HttpSourceConfig;
import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.util.ConfigUtil;
-import org.apache.http.HttpHeaders;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-
+import java.net.URL;
import java.util.List;
+import java.util.Objects;
import java.util.UUID;
import org.junit.jupiter.api.AfterEach;
@@ -40,12 +34,19 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
class HttpSourceConnectorTest {
private HttpSourceConnector connector;
private SourceConnectorConfig config;
- private CloseableHttpClient httpClient;
- private String uri;
+ private OkHttpClient httpClient;
+ private String url;
private final String expectedMessage = "testHttpMessage";
@BeforeEach
@@ -56,9 +57,8 @@ class HttpSourceConnectorTest {
connector.init(sourceConfig);
connector.start();
- uri = new
URIBuilder().setScheme("http").setHost("127.0.0.1").setPort(config.getPort()).setPath(config.getPath()).build().toString();
-
- httpClient = HttpClients.createDefault();
+ url = new URL("http", "127.0.0.1", config.getPort(),
config.getPath()).toString();
+ httpClient = new OkHttpClient();
}
@Test
@@ -66,9 +66,9 @@ class HttpSourceConnectorTest {
final int batchSize = 10;
// test binary content mode
for (int i = 0; i < batchSize; i++) {
- HttpResponse resp = mockBinaryRequest();
- Assertions.assertEquals(resp.getStatusLine().getStatusCode(),
HttpStatus.SC_OK);
-
+ try (Response resp = mockBinaryRequest()) {
+ Assertions.assertEquals(200, resp.code());
+ }
}
List<ConnectRecord> res = connector.poll();
Assertions.assertEquals(batchSize, res.size());
@@ -78,8 +78,9 @@ class HttpSourceConnectorTest {
// test structured content mode
for (int i = 0; i < batchSize; i++) {
- HttpResponse resp = mockStructuredRequest();
- Assertions.assertEquals(resp.getStatusLine().getStatusCode(),
HttpStatus.SC_OK);
+ try (Response resp = mockStructuredRequest()) {
+ Assertions.assertEquals(200, resp.code());
+ }
}
res = connector.poll();
Assertions.assertEquals(batchSize, res.size());
@@ -88,30 +89,39 @@ class HttpSourceConnectorTest {
}
// test invalid requests
- HttpPost invalidPost = new HttpPost(uri);
- invalidPost.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
- invalidPost.setHeader("ce-id", String.valueOf(UUID.randomUUID()));
- HttpResponse resp = httpClient.execute(invalidPost);
- Assertions.assertEquals(HttpStatus.SC_BAD_REQUEST,
resp.getStatusLine().getStatusCode());
+ Request request = new Request.Builder()
+ .url(url)
+ .addHeader("Content-Type", "text/plain")
+ .addHeader("ce-id", String.valueOf(UUID.randomUUID()))
+ .build();
+
+ try (Response resp = httpClient.newCall(request).execute()) {
+ // verify the response code
+ Assertions.assertEquals(405, resp.code());
+ }
+
}
- HttpResponse mockBinaryRequest() throws Exception {
- HttpPost httpPost = new HttpPost(uri);
- httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
- httpPost.setHeader("ce-id", String.valueOf(UUID.randomUUID()));
- httpPost.setHeader("ce-specversion", "1.0");
- httpPost.setHeader("ce-type", "com.example.someevent");
- httpPost.setHeader("ce-source", "/mycontext");
- httpPost.setHeader("ce-subject", "test");
- httpPost.setEntity(new StringEntity(expectedMessage));
-
- return httpClient.execute(httpPost);
+ Response mockBinaryRequest() throws Exception {
+
+ RequestBody body = RequestBody.create(expectedMessage,
MediaType.parse("text/plain"));
+
+ Request request = new Request.Builder()
+ .url(url)
+ .addHeader("Content-Type", "text/plain")
+ .addHeader("ce-id", String.valueOf(UUID.randomUUID()))
+ .addHeader("ce-specversion", "1.0")
+ .addHeader("ce-type", "com.example.someevent")
+ .addHeader("ce-source", "/mycontext")
+ .addHeader("ce-subject", "test")
+ .post(body)
+ .build();
+
+ return httpClient.newCall(request).execute();
}
- HttpResponse mockStructuredRequest() throws Exception {
- HttpPost httpPost = new HttpPost(uri);
- // according to the CloudEvent specification, a json format event MUST
use the media type `application/cloudevents+json`
- httpPost.setHeader(HttpHeaders.CONTENT_TYPE,
"application/cloudevents+json");
+ Response mockStructuredRequest() throws Exception {
+ // create a CloudEvent
TestEvent event = new TestEvent();
event.id = String.valueOf(UUID.randomUUID());
event.specversion = "1.0";
@@ -120,15 +130,23 @@ class HttpSourceConnectorTest {
event.subject = "test";
event.datacontenttype = "text/plain";
event.data = expectedMessage;
- httpPost.setEntity(new StringEntity(JsonUtils.toJSONString(event)));
- return httpClient.execute(httpPost);
+ RequestBody body =
RequestBody.create(Objects.requireNonNull(JsonUtils.toJSONString(event)),
MediaType.parse("application/cloudevents+json"));
+
+ Request request = new Request.Builder()
+ .url(url)
+ .addHeader("Content-Type", "application/cloudevents+json")
+ .post(body)
+ .build();
+
+ return httpClient.newCall(request).execute();
+
}
@AfterEach
- void tearDown() throws Exception {
+ void tearDown() {
connector.stop();
- httpClient.close();
+ httpClient.dispatcher().executorService().shutdown();
}
class TestEvent {
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
index 0a3e68d07..735d3b01d 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
+++
b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
@@ -27,4 +27,12 @@ pubSubConfig:
connectorConfig:
connectorName: httpSource
path: /test
- port: 3755
\ No newline at end of file
+ port: 3755
+ idleTimeout: 5000 # timeunit: ms
+ maxFormAttributeSize: 1048576 # timeunit: byte, default: 1048576(1MB).
This applies only when handling form data submissions.
+ maxStorageSize: 1000 # max storage size, default: 1000
+ batchSize: 10 # batch size, default: 10
+ protocol: CloudEvent # Case insensitive, default: CloudEvent, options:
CloudEvent, GitHub, Common
+ extraConfig: # extra config for different protocol, e.g. GitHub secret
+ secret: xxxxxxx # GitHub secret
+ contentType: application/json # GitHub content type
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]