This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 996c439e4 [ISSUE #4869] Add Webhook support for HTTP Source Connector 
(#4913)
996c439e4 is described below

commit 996c439e457cd57d5c1b21341e55ea44ad58999e
Author: Zaki <[email protected]>
AuthorDate: Thu Jun 27 14:31:36 2024 +0800

    [ISSUE #4869] Add Webhook support for HTTP Source Connector (#4913)
    
    * feat: add webhook support
    
    * feat: add CommonProtocol and optimize some logic
    
    * doc: update some javaDoc
    
    * fix: update something
    
    * fix: Optimize queue related operations
    
    * fix: Optimize queue related operations
    
    * fix: Optimize queue related operations
    
    * fix: update something
    
    * fix: Remove some meaningless code
    
    * fix: update something
    
    * perf: Updates to protocol-related code, with a focus on request handling
    
    * perf: use CircularFifoQueue
    
    * perf: Use OkHttp Client to completely replace Apache Client and resolve 
code conflicts.
---
 .../connector/http/SourceConnectorConfig.java      |  28 ++-
 .../eventmesh-connector-http/build.gradle          |   1 -
 .../http/common/SynchronizedCircularFifoQueue.java | 148 ++++++++++++++
 .../http/sink/handle/RetryHttpSinkHandler.java     |   6 +-
 .../http/sink/handle/WebhookHttpSinkHandler.java   |  82 ++------
 .../http/source/connector/HttpSourceConnector.java |  94 ++++-----
 .../connector/http/source/data/CommonResponse.java |  74 +++++++
 .../connector/http/source/data/WebhookRequest.java |  25 ++-
 .../connector/http/source/protocol/Protocol.java   |  58 ++++++
 .../http/source/protocol/ProtocolFactory.java      |  80 ++++++++
 .../http/source/protocol/WebhookConstants.java     |  50 +++++
 .../source/protocol/impl/CloudEventProtocol.java   | 107 ++++++++++
 .../http/source/protocol/impl/CommonProtocol.java  | 111 ++++++++++
 .../http/source/protocol/impl/GitHubProtocol.java  | 227 +++++++++++++++++++++
 .../src/main/resources/source-config.yml           |   9 +-
 .../source/connector/HttpSourceConnectorTest.java  | 104 ++++++----
 .../src/test/resources/source-config.yml           |  10 +-
 17 files changed, 1039 insertions(+), 175 deletions(-)

diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
index 4ffd6a134..4f69f5504 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
@@ -17,6 +17,9 @@
 
 package org.apache.eventmesh.common.config.connector.http;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import lombok.Data;
 
 @Data
@@ -28,5 +31,28 @@ public class SourceConnectorConfig {
 
     private int port;
 
-    private int idleTimeout;
+    // timeunit: ms, default 5000ms
+    private int idleTimeout = 5000;
+
+    /**
+     * <ul>
+     *     <li>The maximum size allowed for form attributes when Content-Type 
is application/x-www-form-urlencoded or multipart/form-data </li>
+     *     <li>Default is 1MB (1024 * 1024 bytes). </li>
+     *     <li>If you receive a "size exceed allowed maximum capacity" error, 
you can increase this value. </li>
+     *     <li>Note: This applies only when handling form data 
submissions.</li>
+     * </ul>
+     */
+    private int maxFormAttributeSize = 1024 * 1024;
+
+    // max size of the queue, default 1000
+    private int maxStorageSize = 1000;
+
+    // batch size, default 10
+    private int batchSize = 10;
+
+    // protocol, default CloudEvent
+    private String protocol = "CloudEvent";
+
+    // extra config, e.g. GitHub secret
+    private Map<String, String> extraConfig = new HashMap<>();
 }
diff --git a/eventmesh-connectors/eventmesh-connector-http/build.gradle 
b/eventmesh-connectors/eventmesh-connector-http/build.gradle
index a6086bb1e..b64f7903b 100644
--- a/eventmesh-connectors/eventmesh-connector-http/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-http/build.gradle
@@ -24,7 +24,6 @@ dependencies {
     implementation 'io.vertx:vertx-web-client:4.5.8'
     implementation 'dev.failsafe:failsafe:3.3.2'
 
-    testImplementation "org.apache.httpcomponents:httpclient"
     testImplementation 'org.mock-server:mockserver-netty:5.15.0'
     testImplementation 'com.squareup.okhttp3:okhttp:4.12.0'
     compileOnly 'org.projectlombok:lombok'
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
new file mode 100644
index 000000000..439a9f3d7
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.common;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+
+/**
+ * SynchronizedCircularFifoQueue is a synchronized version of 
CircularFifoQueue.
+ */
+public class SynchronizedCircularFifoQueue<E> extends CircularFifoQueue<E> {
+
+    /**
+     * <p>Default constructor. capacity = 32</p>
+     */
+    public SynchronizedCircularFifoQueue() {
+        super();
+    }
+
+    public SynchronizedCircularFifoQueue(Collection<? extends E> coll) {
+        super(coll);
+    }
+
+    public SynchronizedCircularFifoQueue(int size) {
+        super(size);
+    }
+
+    @Override
+    public synchronized boolean add(E element) {
+        return super.add(element);
+    }
+
+    @Override
+    public synchronized void clear() {
+        super.clear();
+    }
+
+    @Override
+    public synchronized E element() {
+        return super.element();
+    }
+
+    @Override
+    public synchronized E get(int index) {
+        return super.get(index);
+    }
+
+    @Override
+    public synchronized boolean isAtFullCapacity() {
+        return super.isAtFullCapacity();
+    }
+
+    @Override
+    public synchronized boolean isEmpty() {
+        return super.isEmpty();
+    }
+
+    @Override
+    public synchronized boolean isFull() {
+        return super.isFull();
+    }
+
+    @Override
+    public synchronized int maxSize() {
+        return super.maxSize();
+    }
+
+    @Override
+    public synchronized boolean offer(E element) {
+        return super.offer(element);
+    }
+
+    @Override
+    public synchronized E peek() {
+        return super.peek();
+    }
+
+    @Override
+    public synchronized E poll() {
+        return super.poll();
+    }
+
+    @Override
+    public synchronized E remove() {
+        return super.remove();
+    }
+
+    @Override
+    public synchronized int size() {
+        return super.size();
+    }
+
+    /**
+     * <p>Fetch a range of elements from the queue.</p>
+     *
+     * @param start   start index
+     * @param end     end index
+     * @param removed whether to remove the elements from the queue
+     * @return list of elements
+     */
+    public synchronized List<E> fetchRange(int start, int end, boolean 
removed) {
+
+        if (start < 0 || end > this.size() || start > end) {
+            throw new IllegalArgumentException("Invalid range");
+        }
+
+        Iterator<E> iterator = this.iterator();
+        List<E> items = new ArrayList<>(end - start);
+
+        int count = 0;
+        while (iterator.hasNext() && count < end) {
+            E item = iterator.next();
+            if (item != null && count >= start) {
+                // Add the element to the list
+                items.add(item);
+                if (removed) {
+                    // Remove the element from the queue
+                    iterator.remove();
+                }
+            }
+            count++;
+        }
+        return items;
+
+    }
+
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
index 06700261d..bc2a53610 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
@@ -132,7 +132,7 @@ public class RetryHttpSinkHandler implements 
HttpSinkHandler {
                     // convert the result to an HttpExportRecord
                     HttpExportRecord exportRecord = 
covertToExportRecord(httpConnectRecord, event, event.getResult(), 
event.getException(), url, id);
                     // add the data to the queue
-                    ((WebhookHttpSinkHandler) 
sinkHandler).addDataToQueue(exportRecord);
+                    ((WebhookHttpSinkHandler) 
sinkHandler).getReceivedDataQueue().offer(exportRecord);
                 }
             })
             .onRetry(event -> {
@@ -144,7 +144,7 @@ public class RetryHttpSinkHandler implements 
HttpSinkHandler {
                 if (connectorConfig.getWebhookConfig().isActivate()) {
                     HttpExportRecord exportRecord =
                         covertToExportRecord(httpConnectRecord, event, 
event.getLastResult(), event.getLastException(), url, id);
-                    ((WebhookHttpSinkHandler) 
sinkHandler).addDataToQueue(exportRecord);
+                    ((WebhookHttpSinkHandler) 
sinkHandler).getReceivedDataQueue().offer(exportRecord);
                 }
                 // update the HttpConnectRecord
                 httpConnectRecord.setTime(LocalDateTime.now().toString());
@@ -159,7 +159,7 @@ public class RetryHttpSinkHandler implements 
HttpSinkHandler {
                 }
                 if (connectorConfig.getWebhookConfig().isActivate()) {
                     HttpExportRecord exportRecord = 
covertToExportRecord(httpConnectRecord, event, event.getResult(), 
event.getException(), url, id);
-                    ((WebhookHttpSinkHandler) 
sinkHandler).addDataToQueue(exportRecord);
+                    ((WebhookHttpSinkHandler) 
sinkHandler).getReceivedDataQueue().offer(exportRecord);
                 }
             }).build();
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
index e07683fcf..9af246bc6 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
@@ -18,6 +18,7 @@
 package org.apache.eventmesh.connector.http.sink.handle;
 
 import org.apache.eventmesh.common.exception.EventMeshException;
+import 
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
 import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
 import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
 import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
@@ -30,13 +31,9 @@ import org.apache.commons.lang3.StringUtils;
 
 import java.net.URI;
 import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.vertx.core.Future;
@@ -73,25 +70,22 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
     private HttpServer exportServer;
 
     // store the received data, when webhook is enabled
-    private final ConcurrentLinkedQueue<HttpExportRecord> receivedDataQueue;
-
-    // the maximum queue size
-    private final int maxQueueSize;
-
-    // the current queue size
-    private final AtomicInteger currentQueueSize;
+    private final SynchronizedCircularFifoQueue<HttpExportRecord> 
receivedDataQueue;
 
     public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
         super(sinkConnectorConfig);
         this.sinkConnectorConfig = sinkConnectorConfig;
         this.webhookConfig = sinkConnectorConfig.getWebhookConfig();
-        this.maxQueueSize = this.webhookConfig.getMaxStorageSize();
-        this.currentQueueSize = new AtomicInteger(0);
-        this.receivedDataQueue = new ConcurrentLinkedQueue<>();
+        int maxQueueSize = this.webhookConfig.getMaxStorageSize();
+        this.receivedDataQueue = new 
SynchronizedCircularFifoQueue<>(maxQueueSize);
         // init the export server
         doInitExportServer();
     }
 
+    public SynchronizedCircularFifoQueue<HttpExportRecord> 
getReceivedDataQueue() {
+        return receivedDataQueue;
+    }
+
     /**
      * Initialize the server for exporting the received data
      */
@@ -135,7 +129,7 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
                 int pageNum = StringUtils.isBlank(pageNumStr) ? 1 : 
Integer.parseInt(pageNumStr);
                 int pageSize = Integer.parseInt(pageSizeStr);
 
-                if (currentQueueSize.get() == 0) {
+                if (receivedDataQueue.isEmpty()) {
                     ctx.response()
                         .putHeader(HttpHeaders.CONTENT_TYPE, 
"application/json; charset=utf-8")
                         .setStatusCode(HttpResponseStatus.NO_CONTENT.code())
@@ -148,12 +142,12 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
                 List<HttpExportRecord> exportRecords;
                 if (Objects.equals(type, TypeEnum.POLL.getValue())) {
                     // If the type is poll, only the first page of data is 
exported and removed
-                    exportRecords = getDataFromQueue(0, pageSize, true);
+                    exportRecords = receivedDataQueue.fetchRange(0, pageSize, 
true);
                 } else {
                     // If the type is peek, the specified page of data is 
exported without removing
                     int startIndex = (pageNum - 1) * pageSize;
                     int endIndex = startIndex + pageSize;
-                    exportRecords = getDataFromQueue(startIndex, endIndex, 
false);
+                    exportRecords = receivedDataQueue.fetchRange(startIndex, 
endIndex, false);
                 }
 
                 // Create HttpExportRecordPage
@@ -242,63 +236,11 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
             // create ExportRecord
             HttpExportRecord exportRecord = new 
HttpExportRecord(httpExportMetadata, arr.succeeded() ? 
arr.result().bodyAsString() : null);
             // add the data to the queue
-            addDataToQueue(exportRecord);
+            receivedDataQueue.offer(exportRecord);
         });
     }
 
 
-    /**
-     * Adds the received data to the queue.
-     *
-     * @param exportRecord the received data to add to the queue
-     */
-    public void addDataToQueue(HttpExportRecord exportRecord) {
-        // If the current queue size is greater than or equal to the maximum 
queue size, remove the oldest element
-        if (currentQueueSize.get() >= maxQueueSize) {
-            Object removedData = receivedDataQueue.poll();
-            if (log.isDebugEnabled()) {
-                log.debug("The queue is full, remove the oldest element: {}", 
removedData);
-            } else {
-                log.info("The queue is full, remove the oldest element");
-            }
-            currentQueueSize.decrementAndGet();
-        }
-        // Try to put the received data into the queue
-        if (receivedDataQueue.offer(exportRecord)) {
-            currentQueueSize.incrementAndGet();
-            log.debug("Successfully put the received data into the queue: {}", 
exportRecord);
-        } else {
-            log.error("Failed to put the received data into the queue: {}", 
exportRecord);
-        }
-    }
-
-    /**
-     * Gets the received data from the queue.
-     *
-     * @param startIndex the start index of the data to get
-     * @param endIndex   the end index of the data to get
-     * @param removed    whether to remove the data from the queue
-     * @return the received data
-     */
-    private List<HttpExportRecord> getDataFromQueue(int startIndex, int 
endIndex, boolean removed) {
-        Iterator<HttpExportRecord> iterator = receivedDataQueue.iterator();
-
-        List<HttpExportRecord> pageItems = new ArrayList<>(endIndex - 
startIndex);
-        int count = 0;
-        while (iterator.hasNext() && count < endIndex) {
-            HttpExportRecord item = iterator.next();
-            if (count >= startIndex) {
-                pageItems.add(item);
-                if (removed) {
-                    iterator.remove();
-                    currentQueueSize.decrementAndGet();
-                }
-            }
-            count++;
-        }
-        return pageItems;
-    }
-
     /**
      * Cleans up and releases resources used by the HTTP/HTTPS handler.
      */
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
index 5f63e9aa3..b976fed9b 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
@@ -20,25 +20,23 @@ package 
org.apache.eventmesh.connector.http.source.connector;
 import org.apache.eventmesh.common.config.connector.Config;
 import org.apache.eventmesh.common.config.connector.http.HttpSourceConfig;
 import org.apache.eventmesh.common.exception.EventMeshException;
+import 
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
+import org.apache.eventmesh.connector.http.source.protocol.Protocol;
+import org.apache.eventmesh.connector.http.source.protocol.ProtocolFactory;
 import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
 import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
 import org.apache.eventmesh.openconnect.api.source.Source;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
-import org.apache.eventmesh.openconnect.util.CloudEventUtil;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import io.cloudevents.CloudEvent;
-import io.cloudevents.http.vertx.VertxMessageFactory;
-import io.netty.handler.codec.http.HttpResponseStatus;
 import io.vertx.core.Vertx;
-import io.vertx.core.http.HttpMethod;
 import io.vertx.core.http.HttpServer;
 import io.vertx.core.http.HttpServerOptions;
+import io.vertx.ext.web.Route;
 import io.vertx.ext.web.Router;
 import io.vertx.ext.web.handler.LoggerHandler;
 
@@ -47,12 +45,17 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class HttpSourceConnector implements Source {
 
-    private static final int DEFAULT_BATCH_SIZE = 10;
-
     private HttpSourceConfig sourceConfig;
-    private BlockingQueue<CloudEvent> queue;
+
+    private SynchronizedCircularFifoQueue<Object> queue;
+
+    private int batchSize;
+
+    private Protocol protocol;
+
     private HttpServer server;
 
+
     @Override
     public Class<? extends Config> configClass() {
         return HttpSourceConfig.class;
@@ -72,42 +75,32 @@ public class HttpSourceConnector implements Source {
     }
 
     private void doInit() {
-        this.queue = new LinkedBlockingQueue<>(1000);
+        // init queue
+        int maxQueueSize = 
this.sourceConfig.getConnectorConfig().getMaxStorageSize();
+        this.queue = new SynchronizedCircularFifoQueue<>(maxQueueSize);
+
+        // init batch size
+        this.batchSize = this.sourceConfig.getConnectorConfig().getBatchSize();
+
+        // init protocol
+        String protocolName = 
this.sourceConfig.getConnectorConfig().getProtocol();
+        this.protocol = 
ProtocolFactory.getInstance(this.sourceConfig.connectorConfig, protocolName);
 
         final Vertx vertx = Vertx.vertx();
         final Router router = Router.router(vertx);
-        router.route()
+        final Route route = router.route()
             .path(this.sourceConfig.connectorConfig.getPath())
-            .method(HttpMethod.POST)
-            .handler(LoggerHandler.create())
-            .handler(ctx -> {
-                VertxMessageFactory.createReader(ctx.request())
-                    .map(reader -> {
-                        CloudEvent event = reader.toEvent();
-                        if (event.getSubject() == null) {
-                            throw new IllegalStateException("attribute 
'subject' cannot be null");
-                        }
-                        if (event.getDataContentType() == null) {
-                            throw new IllegalStateException("attribute 
'datacontenttype' cannot be null");
-                        }
-                        if (event.getData() == null) {
-                            throw new IllegalStateException("attribute 'data' 
cannot be null");
-                        }
-                        return event;
-                    })
-                    .onSuccess(event -> {
-                        queue.add(event);
-                        log.info("[HttpSourceConnector] Succeed to convert 
payload into CloudEvent. StatusCode={}", HttpResponseStatus.OK.code());
-                        
ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end();
-                    })
-                    .onFailure(t -> {
-                        log.error("[HttpSourceConnector] Malformed request. 
StatusCode={}", HttpResponseStatus.BAD_REQUEST.code(), t);
-                        
ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end();
-                    });
-            });
+            .handler(LoggerHandler.create());
+
+        // set protocol handler
+        this.protocol.setHandler(route, queue);
+
+        // create server
         this.server = vertx.createHttpServer(new HttpServerOptions()
             .setPort(this.sourceConfig.connectorConfig.getPort())
-            
.setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())).requestHandler(router);
+            
.setMaxFormAttributeSize(this.sourceConfig.connectorConfig.getMaxFormAttributeSize())
+            .setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())
+            .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)).requestHandler(router);
     }
 
     @Override
@@ -138,17 +131,20 @@ public class HttpSourceConnector implements Source {
 
     @Override
     public List<ConnectRecord> poll() {
-        List<ConnectRecord> connectRecords = new 
ArrayList<>(DEFAULT_BATCH_SIZE);
-        for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
-            try {
-                CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
-                if (event == null) {
-                    break;
-                }
-                connectRecords.add(CloudEventUtil.convertEventToRecord(event));
-            } catch (InterruptedException e) {
+        // if queue is empty, return empty list
+        if (queue.isEmpty()) {
+            return Collections.emptyList();
+        }
+        // poll from queue
+        List<ConnectRecord> connectRecords = new ArrayList<>(batchSize);
+        for (int i = 0; i < batchSize; i++) {
+            Object obj = queue.poll();
+            if (obj == null) {
                 break;
             }
+            // convert to ConnectRecord
+            ConnectRecord connectRecord = protocol.convertToConnectRecord(obj);
+            connectRecords.add(connectRecord);
         }
         return connectRecords;
     }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/CommonResponse.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/CommonResponse.java
new file mode 100644
index 000000000..870f2afbe
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/CommonResponse.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.data;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONWriter.Feature;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Webhook response.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class CommonResponse implements Serializable {
+
+    private static final long serialVersionUID = 8616938575207104455L;
+
+    private String msg;
+
+    private LocalDateTime handleTime;
+
+    /**
+     * Convert to json string.
+     *
+     * @return json string
+     */
+    public String toJsonStr() {
+        return JSON.toJSONString(this, Feature.WriteMapNullValue);
+    }
+
+
+    /**
+     * Create a success response.
+     *
+     * @return response
+     */
+    public static CommonResponse success() {
+        return base("success");
+    }
+
+
+    /**
+     * Create a base response.
+     *
+     * @param msg message
+     * @return response
+     */
+    public static CommonResponse base(String msg) {
+        return new CommonResponse(msg, LocalDateTime.now());
+    }
+
+}
diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java
similarity index 62%
copy from 
eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
copy to 
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java
index 4ffd6a134..2fe7399da 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/data/WebhookRequest.java
@@ -15,18 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.common.config.connector.http;
+package org.apache.eventmesh.connector.http.source.data;
 
+import java.io.Serializable;
+import java.util.Map;
+
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
+/**
+ * Webhook Protocol Request.
+ */
 @Data
-public class SourceConnectorConfig {
+@NoArgsConstructor
+@AllArgsConstructor
+public class WebhookRequest implements Serializable {
+
+    private static final long serialVersionUID = -483500600756490500L;
+
+    private String protocolName;
 
-    private String connectorName;
+    private String url;
 
-    private String path;
+    private Map<String, String> headers;
 
-    private int port;
+    private Object payload;
 
-    private int idleTimeout;
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java
new file mode 100644
index 000000000..b671383e5
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/Protocol.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.protocol;
+
+import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
+import 
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import io.vertx.ext.web.Route;
+
+
+/**
+ * Protocol Interface.
+ * All protocols should implement this interface.
+ */
+public interface Protocol {
+
+
+    /**
+     * Initialize the protocol.
+     *
+     * @param sourceConnectorConfig source connector config
+     */
+    void initialize(SourceConnectorConfig sourceConnectorConfig);
+
+
+    /**
+     * Handle the protocol message.
+     *
+     * @param route     route
+     * @param queue queue info
+     */
+    void setHandler(Route route, SynchronizedCircularFifoQueue<Object> queue);
+
+
+    /**
+     * Convert the message to ConnectRecord.
+     *
+     * @param message message
+     * @return ConnectRecord
+     */
+    ConnectRecord convertToConnectRecord(Object message);
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/ProtocolFactory.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/ProtocolFactory.java
new file mode 100644
index 000000000..6e6100e88
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/ProtocolFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.protocol;
+
+import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
+import 
org.apache.eventmesh.connector.http.source.protocol.impl.CloudEventProtocol;
+import org.apache.eventmesh.connector.http.source.protocol.impl.CommonProtocol;
+import org.apache.eventmesh.connector.http.source.protocol.impl.GitHubProtocol;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Protocol factory. This class is responsible for storing and creating 
instances of {@link Protocol} classes.
+ */
+public class ProtocolFactory {
+
+    // protocol name -> protocol class
+    private static final ConcurrentHashMap<String, Class<?>> protocols = new 
ConcurrentHashMap<>();
+
+    static {
+        // register all protocols
+        registerProtocol(CloudEventProtocol.PROTOCOL_NAME, 
CloudEventProtocol.class);
+        registerProtocol(GitHubProtocol.PROTOCOL_NAME, GitHubProtocol.class);
+        registerProtocol(CommonProtocol.PROTOCOL_NAME, CommonProtocol.class);
+    }
+
+
+    /**
+     * Register a protocol
+     *
+     * @param name  name of the protocol
+     * @param clazz class of the protocol
+     */
+    public static void registerProtocol(String name, Class<?> clazz) {
+        if (Protocol.class.isAssignableFrom(clazz)) {
+            // put the class into the map(case insensitive)
+            protocols.put(name.toLowerCase(), clazz);
+        } else {
+            throw new IllegalArgumentException("Class " + clazz.getName() + " 
does not implement Protocol interface");
+        }
+    }
+
+    /**
+     * Get an instance of a protocol, if it is not already created, create a 
new instance
+     *
+     * @param name name of the protocol
+     * @return instance of the protocol
+     */
+    public static Protocol getInstance(SourceConnectorConfig 
sourceConnectorConfig, String name) {
+        // get the class by name(case insensitive)
+        Class<?> clazz = Optional.ofNullable(protocols.get(name.toLowerCase()))
+            .orElseThrow(() -> new IllegalArgumentException("Protocol " + name 
+ " is not registered"));
+        try {
+            // create a new instance
+            Protocol protocol = (Protocol) clazz.newInstance();
+            // initialize the protocol
+            protocol.initialize(sourceConnectorConfig);
+            return protocol;
+        } catch (InstantiationException | IllegalAccessException e) {
+            throw new IllegalArgumentException("Failed to instantiate protocol 
" + name, e);
+        }
+    }
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/WebhookConstants.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/WebhookConstants.java
new file mode 100644
index 000000000..b31637427
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/WebhookConstants.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.protocol;
+
+public class WebhookConstants {
+
+    /**
+     * -------------------------------------- About GitHub 
--------------------------------------
+     */
+
+    // A globally unique identifier (GUID) to identify the delivery.
+    public static final String GITHUB_DELIVERY = "X-GitHub-Delivery";
+
+    // This header is sent if the webhook is configured with a secret.
+    // We recommend that you use the more secure X-Hub-Signature-256 instead
+    public static final String GITHUB_SIGNATURE = "X-Hub-Signature";
+
+    // This header is sent if the webhook is configured with a secret
+    public static final String GITHUB_SIGNATURE_256 = "X-Hub-Signature-256";
+
+    public static final String GITHUB_HASH_265_PREFIX = "sha256=";
+
+    // The name of the event that triggered the delivery.
+    public static final String GITHUB_EVENT = "X-GitHub-Event";
+
+    // The unique identifier of the webhook.
+    public static final String GITHUB_HOOK_ID = "X-GitHub-Hook-ID";
+
+    // The unique identifier of the resource where the webhook was created.
+    public static final String GITHUB_HOOK_INSTALLATION_TARGET_ID = 
"X-GitHub-Hook-Installation-Target-ID";
+
+    // The type of resource where the webhook was created.
+    public static final String GITHUB_HOOK_INSTALLATION_TARGET_TYPE = 
"X-GitHub-Hook-Installation-Target-Type";
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
new file mode 100644
index 000000000..4906e920f
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CloudEventProtocol.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.protocol.impl;
+
+import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
+import 
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
+import org.apache.eventmesh.connector.http.source.data.CommonResponse;
+import org.apache.eventmesh.connector.http.source.protocol.Protocol;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.util.CloudEventUtil;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.http.vertx.VertxMessageFactory;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.ext.web.Route;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * CloudEvent Protocol.
+ */
+@Slf4j
+public class CloudEventProtocol implements Protocol {
+
+    // Protocol name
+    public static final String PROTOCOL_NAME = "CloudEvent";
+
+
+    /**
+     * Initialize the protocol.
+     *
+     * @param sourceConnectorConfig source connector config
+     */
+    @Override
+    public void initialize(SourceConnectorConfig sourceConnectorConfig) {
+
+    }
+
+
+    /**
+     * Handle the protocol message for CloudEvent.
+     *
+     * @param route     route
+     * @param queue queue info
+     */
+    @Override
+    public void setHandler(Route route, SynchronizedCircularFifoQueue<Object> 
queue) {
+        route.method(HttpMethod.POST)
+            .handler(ctx -> VertxMessageFactory.createReader(ctx.request())
+                .map(reader -> {
+                    CloudEvent event = reader.toEvent();
+                    if (event.getSubject() == null) {
+                        throw new IllegalStateException("attribute 'subject' 
cannot be null");
+                    }
+                    if (event.getDataContentType() == null) {
+                        throw new IllegalStateException("attribute 
'datacontenttype' cannot be null");
+                    }
+                    if (event.getData() == null) {
+                        throw new IllegalStateException("attribute 'data' 
cannot be null");
+                    }
+                    return event;
+                })
+                .onSuccess(event -> {
+                    // Add the event to the queue, thread-safe
+                    if (!queue.offer(event)) {
+                        throw new IllegalStateException("Failed to store the 
request.");
+                    }
+                    log.info("[HttpSourceConnector] Succeed to convert payload 
into CloudEvent. StatusCode={}", HttpResponseStatus.OK.code());
+                    ctx.response()
+                        .setStatusCode(HttpResponseStatus.OK.code())
+                        .end(CommonResponse.success().toJsonStr());
+                })
+                .onFailure(t -> {
+                    log.error("[HttpSourceConnector] Malformed request. 
StatusCode={}", HttpResponseStatus.BAD_REQUEST.code(), t);
+                    ctx.response()
+                        .setStatusCode(HttpResponseStatus.BAD_REQUEST.code())
+                        .end(CommonResponse.base(t.getMessage()).toJsonStr());
+                }));
+    }
+
+    /**
+     * Convert the message to ConnectRecord.
+     *
+     * @param message message
+     * @return ConnectRecord
+     */
+    @Override
+    public ConnectRecord convertToConnectRecord(Object message) {
+        return CloudEventUtil.convertEventToRecord((CloudEvent) message);
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
new file mode 100644
index 000000000..80e4f0a75
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.protocol.impl;
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
+import 
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
+import org.apache.eventmesh.connector.http.source.data.CommonResponse;
+import org.apache.eventmesh.connector.http.source.data.WebhookRequest;
+import org.apache.eventmesh.connector.http.source.protocol.Protocol;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.handler.BodyHandler;
+
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Common Protocol. This class represents the common webhook protocol. The 
processing method of this class does not perform any other operations
+ * except storing the request and returning a general response.
+ */
+@Slf4j
+public class CommonProtocol implements Protocol {
+
+    public static final String PROTOCOL_NAME = "Common";
+
+    /**
+     * Initialize the protocol
+     *
+     * @param sourceConnectorConfig source connector config
+     */
+    @Override
+    public void initialize(SourceConnectorConfig sourceConnectorConfig) {
+
+    }
+
+    /**
+     * Set the handler for the route
+     *
+     * @param route route
+     * @param queue queue info
+     */
+    @Override
+    public void setHandler(Route route, SynchronizedCircularFifoQueue<Object> 
queue) {
+        route.method(HttpMethod.POST)
+            .handler(BodyHandler.create())
+            .handler(ctx -> {
+                // Get the payload
+                String payloadStr = 
ctx.body().asString(Constants.DEFAULT_CHARSET.toString());
+
+                // Create and store the webhook request
+                Map<String, String> headerMap = 
ctx.request().headers().entries().stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+                WebhookRequest webhookRequest = new 
WebhookRequest(PROTOCOL_NAME, ctx.request().absoluteURI(), headerMap, 
payloadStr);
+                if (!queue.offer(webhookRequest)) {
+                    throw new IllegalStateException("Failed to store the 
request.");
+                }
+
+                // Return 200 OK
+                ctx.response()
+                    .setStatusCode(HttpResponseStatus.OK.code())
+                    .end(CommonResponse.success().toJsonStr());
+            })
+            .failureHandler(ctx -> {
+                log.error("Failed to handle the request. ", ctx.failure());
+
+                // Return Bad Response
+                ctx.response()
+                    .setStatusCode(ctx.statusCode())
+                    
.end(CommonResponse.base(ctx.failure().getMessage()).toJsonStr());
+            });
+
+    }
+
+    /**
+     * Convert the message to a connect record
+     *
+     * @param message message
+     * @return connect record
+     */
+    @Override
+    public ConnectRecord convertToConnectRecord(Object message) {
+        WebhookRequest request = (WebhookRequest) message;
+        ConnectRecord connectRecord = new ConnectRecord(null, null, 
System.currentTimeMillis(), request.getPayload());
+        connectRecord.addExtension("source", request.getProtocolName());
+        connectRecord.addExtension("url", request.getUrl());
+        connectRecord.addExtension("headers", request.getHeaders());
+        return connectRecord;
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
new file mode 100644
index 000000000..e86efcbf3
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/GitHubProtocol.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.protocol.impl;
+
+import org.apache.eventmesh.common.Constants;
+import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
+import org.apache.eventmesh.common.exception.EventMeshException;
+import 
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
+import org.apache.eventmesh.connector.http.source.data.CommonResponse;
+import org.apache.eventmesh.connector.http.source.data.WebhookRequest;
+import org.apache.eventmesh.connector.http.source.protocol.Protocol;
+import org.apache.eventmesh.connector.http.source.protocol.WebhookConstants;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.MultiMap;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.ext.web.Route;
+import io.vertx.ext.web.handler.BodyHandler;
+
+import com.alibaba.fastjson2.JSONObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * GitHub Protocol. This class represents the GitHub webhook protocol.
+ */
+@Slf4j
+public class GitHubProtocol implements Protocol {
+
+    // Protocol name
+    public static final String PROTOCOL_NAME = "GitHub";
+
+    private static final String H_MAC_SHA_265 = "HmacSHA256";
+
+    private static final String SECRET_KEY = "secret";
+
+    private String contentType = "application/json";
+
+    private String secret;
+
+
+    /**
+     * Initialize the protocol.
+     *
+     * @param sourceConnectorConfig source connector config
+     */
+    @Override
+    public void initialize(SourceConnectorConfig sourceConnectorConfig) {
+        // Initialize the protocol
+        Map<String, String> extraConfig = 
sourceConnectorConfig.getExtraConfig();
+        // set the secret, if it is not set, throw an exception
+        this.secret = extraConfig.get(SECRET_KEY);
+        if (StringUtils.isBlank(this.secret)) {
+            throw new EventMeshException("The secret is required for GitHub 
protocol.");
+        }
+        // if the content-type is not set, use the default value
+        this.contentType = extraConfig.getOrDefault("contentType", 
contentType);
+    }
+
+    /**
+     * Handle the protocol message for GitHub.
+     *
+     * @param route route
+     * @param queue queue info
+     */
+    @Override
+    public void setHandler(Route route, SynchronizedCircularFifoQueue<Object> 
queue) {
+        route.method(HttpMethod.POST)
+            .handler(BodyHandler.create())
+            .handler(ctx -> {
+                // Get the payload and headers
+                String payloadStr = 
ctx.body().asString(Constants.DEFAULT_CHARSET.toString());
+                MultiMap headers = ctx.request().headers();
+
+                // validate the content type
+                if (!StringUtils.contains(headers.get("Content-Type"), 
contentType)) {
+                    String errorMsg = String.format("content-type is invalid, 
please check the content-type. received content-type: %s",
+                        headers.get("Content-Type"));
+                    // Return Bad Request
+                    ctx.fail(HttpResponseStatus.BAD_REQUEST.code(), new 
EventMeshException(errorMsg));
+                    return;
+                }
+
+                // validate the signature
+                String signature = 
headers.get(WebhookConstants.GITHUB_SIGNATURE_256);
+                if (BooleanUtils.isFalse(validateSignature(signature, 
payloadStr, secret))) {
+                    String errorMsg = String.format("signature is invalid, 
please check the secret. received signature: %s", signature);
+                    // Return Bad Request
+                    ctx.fail(HttpResponseStatus.BAD_REQUEST.code(), new 
EventMeshException(errorMsg));
+                    return;
+                }
+
+                // if the content type is form data, convert it to json string
+                if (StringUtils.contains(contentType, 
"application/x-www-form-urlencoded")
+                    || StringUtils.contains(contentType, 
"multipart/form-data")) {
+                    /*
+                      Convert form data to json string. There are the 
following benefits:
+                      1. Raw form data is not decoded, so it is not easy to 
process directly.
+                      2. Converted to reduce storage space by more than 20 
percent. Experimental result: 10329 bytes -> 7893 bytes.
+                     */
+                    JSONObject payloadObj = new JSONObject();
+                    ctx.request().formAttributes().forEach(entry -> 
payloadObj.put(entry.getKey(), entry.getValue()));
+                    payloadStr = payloadObj.toJSONString();
+                }
+
+                // Create and store the webhook request
+                Map<String, String> headerMap = headers.entries().stream()
+                    .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+                WebhookRequest webhookRequest = new 
WebhookRequest(PROTOCOL_NAME, ctx.request().absoluteURI(), headerMap, 
payloadStr);
+
+                if (!queue.offer(webhookRequest)) {
+                    throw new IllegalStateException("Failed to store the 
request.");
+                }
+
+                // Return 200 OK
+                ctx.response()
+                    .setStatusCode(HttpResponseStatus.OK.code())
+                    .end(CommonResponse.success().toJsonStr());
+            })
+            .failureHandler(ctx -> {
+                log.error("Failed to handle the request from github. ", 
ctx.failure());
+
+                // Return Bad Response
+                ctx.response()
+                    .setStatusCode(ctx.statusCode())
+                    
.end(CommonResponse.base(ctx.failure().getMessage()).toJsonStr());
+            });
+    }
+
+
+    /**
+     * Validate the signature.
+     *
+     * @param signature signature
+     * @param payload   payload
+     * @param secret    secret
+     * @return boolean
+     */
+    public boolean validateSignature(String signature, String payload, String 
secret) {
+        String hash = WebhookConstants.GITHUB_HASH_265_PREFIX;
+        try {
+            Mac sha = Mac.getInstance(H_MAC_SHA_265);
+            SecretKeySpec secretKey = new 
SecretKeySpec(secret.getBytes(Constants.DEFAULT_CHARSET), H_MAC_SHA_265);
+            sha.init(secretKey);
+            byte[] bytes = 
sha.doFinal(payload.getBytes(Constants.DEFAULT_CHARSET));
+            hash += byteArrayToHexString(bytes);
+        } catch (Exception e) {
+            throw new EventMeshException("Error occurred while validating the 
signature.", e);
+        }
+
+        return hash.equals(signature);
+    }
+
+
+    /**
+     * Convert the byte array to hex string.
+     *
+     * @param bytes bytes
+     * @return String
+     */
+    private String byteArrayToHexString(byte[] bytes) {
+        if (bytes == null) {
+            return "";
+        }
+
+        StringBuilder sb = new StringBuilder();
+        for (byte b : bytes) {
+            String hex = Integer.toHexString(0xFF & b);
+            if (hex.length() == 1) {
+                // If the length is 1, append 0
+                sb.append('0');
+            }
+            sb.append(hex);
+        }
+
+        return sb.toString();
+    }
+
+
+    /**
+     * Convert the message to ConnectRecord.
+     *
+     * @param message message
+     * @return ConnectRecord
+     */
+    @Override
+    public ConnectRecord convertToConnectRecord(Object message) {
+        WebhookRequest request = (WebhookRequest) message;
+        Map<String, String> headers = request.getHeaders();
+
+        // Create the ConnectRecord
+        ConnectRecord connectRecord = new ConnectRecord(null, null, 
System.currentTimeMillis(), request.getPayload());
+        connectRecord.addExtension("id", 
headers.get(WebhookConstants.GITHUB_DELIVERY));
+        connectRecord.addExtension("topic", 
headers.get(WebhookConstants.GITHUB_EVENT));
+        connectRecord.addExtension("source", 
headers.get(request.getProtocolName()));
+        connectRecord.addExtension("type", 
headers.get(WebhookConstants.GITHUB_HOOK_INSTALLATION_TARGET_TYPE));
+        return connectRecord;
+    }
+
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
index 9fcc471d3..b1edc084f 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
@@ -28,4 +28,11 @@ connectorConfig:
     connectorName: httpSource
     path: /test
     port: 3755
-    idleTimeout: 5
\ No newline at end of file
+    idleTimeout: 5000   # timeunit: ms
+    maxFormAttributeSize: 1048576 # timeunit: byte, default: 1048576(1MB). 
This applies only when handling form data submissions.
+    maxStorageSize: 1000 # max storage size, default: 1000
+    batchSize: 10 # batch size, default: 10
+    protocol: CloudEvent # Case insensitive, default: CloudEvent, options: 
CloudEvent, GitHub, Common
+    extraConfig: # extra config for different protocol, e.g. GitHub secret
+        secret: xxxxxxx # GitHub secret
+        contentType: application/json # GitHub content type
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
index 90bad0832..b764b4a98 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
@@ -17,22 +17,16 @@
 
 package org.apache.eventmesh.connector.http.source.connector;
 
+
 import org.apache.eventmesh.common.config.connector.http.HttpSourceConfig;
 import org.apache.eventmesh.common.config.connector.http.SourceConnectorConfig;
 import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 import org.apache.eventmesh.openconnect.util.ConfigUtil;
 
-import org.apache.http.HttpHeaders;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-
+import java.net.URL;
 import java.util.List;
+import java.util.Objects;
 import java.util.UUID;
 
 import org.junit.jupiter.api.AfterEach;
@@ -40,12 +34,19 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
 class HttpSourceConnectorTest {
 
     private HttpSourceConnector connector;
     private SourceConnectorConfig config;
-    private CloseableHttpClient httpClient;
-    private String uri;
+    private OkHttpClient httpClient;
+    private String url;
     private final String expectedMessage = "testHttpMessage";
 
     @BeforeEach
@@ -56,9 +57,8 @@ class HttpSourceConnectorTest {
         connector.init(sourceConfig);
         connector.start();
 
-        uri = new 
URIBuilder().setScheme("http").setHost("127.0.0.1").setPort(config.getPort()).setPath(config.getPath()).build().toString();
-
-        httpClient = HttpClients.createDefault();
+        url = new URL("http", "127.0.0.1", config.getPort(), 
config.getPath()).toString();
+        httpClient = new OkHttpClient();
     }
 
     @Test
@@ -66,9 +66,9 @@ class HttpSourceConnectorTest {
         final int batchSize = 10;
         // test binary content mode
         for (int i = 0; i < batchSize; i++) {
-            HttpResponse resp = mockBinaryRequest();
-            Assertions.assertEquals(resp.getStatusLine().getStatusCode(), 
HttpStatus.SC_OK);
-
+            try (Response resp = mockBinaryRequest()) {
+                Assertions.assertEquals(200, resp.code());
+            }
         }
         List<ConnectRecord> res = connector.poll();
         Assertions.assertEquals(batchSize, res.size());
@@ -78,8 +78,9 @@ class HttpSourceConnectorTest {
 
         // test structured content mode
         for (int i = 0; i < batchSize; i++) {
-            HttpResponse resp = mockStructuredRequest();
-            Assertions.assertEquals(resp.getStatusLine().getStatusCode(), 
HttpStatus.SC_OK);
+            try (Response resp = mockStructuredRequest()) {
+                Assertions.assertEquals(200, resp.code());
+            }
         }
         res = connector.poll();
         Assertions.assertEquals(batchSize, res.size());
@@ -88,30 +89,39 @@ class HttpSourceConnectorTest {
         }
 
         // test invalid requests
-        HttpPost invalidPost = new HttpPost(uri);
-        invalidPost.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
-        invalidPost.setHeader("ce-id", String.valueOf(UUID.randomUUID()));
-        HttpResponse resp = httpClient.execute(invalidPost);
-        Assertions.assertEquals(HttpStatus.SC_BAD_REQUEST, 
resp.getStatusLine().getStatusCode());
+        Request request = new Request.Builder()
+            .url(url)
+            .addHeader("Content-Type", "text/plain")
+            .addHeader("ce-id", String.valueOf(UUID.randomUUID()))
+            .build();
+
+        try (Response resp = httpClient.newCall(request).execute()) {
+            // verify the response code
+            Assertions.assertEquals(405, resp.code());
+        }
+
     }
 
-    HttpResponse mockBinaryRequest() throws Exception {
-        HttpPost httpPost = new HttpPost(uri);
-        httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
-        httpPost.setHeader("ce-id", String.valueOf(UUID.randomUUID()));
-        httpPost.setHeader("ce-specversion", "1.0");
-        httpPost.setHeader("ce-type", "com.example.someevent");
-        httpPost.setHeader("ce-source", "/mycontext");
-        httpPost.setHeader("ce-subject", "test");
-        httpPost.setEntity(new StringEntity(expectedMessage));
-
-        return httpClient.execute(httpPost);
+    Response mockBinaryRequest() throws Exception {
+
+        RequestBody body = RequestBody.create(expectedMessage, 
MediaType.parse("text/plain"));
+
+        Request request = new Request.Builder()
+            .url(url)
+            .addHeader("Content-Type", "text/plain")
+            .addHeader("ce-id", String.valueOf(UUID.randomUUID()))
+            .addHeader("ce-specversion", "1.0")
+            .addHeader("ce-type", "com.example.someevent")
+            .addHeader("ce-source", "/mycontext")
+            .addHeader("ce-subject", "test")
+            .post(body)
+            .build();
+
+        return httpClient.newCall(request).execute();
     }
 
-    HttpResponse mockStructuredRequest() throws Exception {
-        HttpPost httpPost = new HttpPost(uri);
-        // according to the CloudEvent specification, a json format event MUST 
use the media type `application/cloudevents+json`
-        httpPost.setHeader(HttpHeaders.CONTENT_TYPE, 
"application/cloudevents+json");
+    Response mockStructuredRequest() throws Exception {
+        // create a CloudEvent
         TestEvent event = new TestEvent();
         event.id = String.valueOf(UUID.randomUUID());
         event.specversion = "1.0";
@@ -120,15 +130,23 @@ class HttpSourceConnectorTest {
         event.subject = "test";
         event.datacontenttype = "text/plain";
         event.data = expectedMessage;
-        httpPost.setEntity(new StringEntity(JsonUtils.toJSONString(event)));
 
-        return httpClient.execute(httpPost);
+        RequestBody body = 
RequestBody.create(Objects.requireNonNull(JsonUtils.toJSONString(event)), 
MediaType.parse("application/cloudevents+json"));
+
+        Request request = new Request.Builder()
+            .url(url)
+            .addHeader("Content-Type", "application/cloudevents+json")
+            .post(body)
+            .build();
+
+        return httpClient.newCall(request).execute();
+
     }
 
     @AfterEach
-    void tearDown() throws Exception {
+    void tearDown() {
         connector.stop();
-        httpClient.close();
+        httpClient.dispatcher().executorService().shutdown();
     }
 
     class TestEvent {
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
 
b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
index 0a3e68d07..735d3b01d 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
@@ -27,4 +27,12 @@ pubSubConfig:
 connectorConfig:
     connectorName: httpSource
     path: /test
-    port: 3755
\ No newline at end of file
+    port: 3755
+    idleTimeout: 5000   # timeunit: ms
+    maxFormAttributeSize: 1048576 # timeunit: byte, default: 1048576(1MB). 
This applies only when handling form data submissions.
+    maxStorageSize: 1000 # max storage size, default: 1000
+    batchSize: 10 # batch size, default: 10
+    protocol: CloudEvent # Case insensitive, default: CloudEvent, options: 
CloudEvent, GitHub, Common
+    extraConfig: # extra config for different protocol, e.g. GitHub secret
+        secret: xxxxxxx # GitHub secret
+        contentType: application/json # GitHub content type
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to