This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new a53196887 [ISSUE #4618] Add HTTP source connector (#4634)
a53196887 is described below

commit a53196887771e338a0356cc1631bfb0150bd1c30
Author: Fungx <[email protected]>
AuthorDate: Tue Dec 12 15:23:27 2023 +0800

    [ISSUE #4618] Add HTTP source connector (#4634)
    
    * [ISSUE #4618] Add HTTP source connector
    
    * fix exception & config
---
 eventmesh-connectors/README.md                     |   2 +-
 .../eventmesh-connector-http/build.gradle          |  27 ++++
 .../eventmesh-connector-http/gradle.properties     |  18 +++
 .../connector/http/config/HttpServerConfig.java    |  32 +++++
 .../connector/http/server/HttpConnectServer.java   |  40 ++++++
 .../http/source/config/HttpSourceConfig.java       |  30 +++++
 .../http/source/config/SourceConnectorConfig.java  |  32 +++++
 .../http/source/connector/HttpSourceConnector.java | 145 +++++++++++++++++++++
 .../src/main/resources/server-config.yml           |  19 +++
 .../src/main/resources/source-config.yml           |  31 +++++
 .../source/connector/HttpSourceConnectorTest.java  | 145 +++++++++++++++++++++
 .../src/test/resources/server-config.yml           |  19 +++
 .../src/test/resources/source-config.yml           |  30 +++++
 settings.gradle                                    |   1 +
 tools/dependency-check/known-dependencies.txt      |  28 +++-
 15 files changed, 594 insertions(+), 5 deletions(-)

diff --git a/eventmesh-connectors/README.md b/eventmesh-connectors/README.md
index ccd1e7dd0..fad80095b 100644
--- a/eventmesh-connectors/README.md
+++ b/eventmesh-connectors/README.md
@@ -36,7 +36,7 @@ Add a new connector by implementing the source/sink interface 
using :
 |         [File](eventmesh-connector-file)         |    Sink     |    ✅    |
 |                      Github                      |   Source    |    ⬜    |
 |                      Github                      |    Sink     |    ⬜    |
-|                       Http                       |   Source    |    ⬜    |
+|         [Http](eventmesh-connector-http)         |   Source    |    ✅    |
 |                       Http                       |    Sink     |    ⬜    |
 |                       Jdbc                       |   Source    |    ⬜    |
 |         [Jdbc](eventmesh-connector-jdbc)         |    Sink     |    ✅    |
diff --git a/eventmesh-connectors/eventmesh-connector-http/build.gradle 
b/eventmesh-connectors/eventmesh-connector-http/build.gradle
new file mode 100644
index 000000000..734b2fc62
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/build.gradle
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+dependencies {
+    api project(":eventmesh-openconnect:eventmesh-openconnect-java")
+    implementation project(":eventmesh-common")
+    implementation 'io.cloudevents:cloudevents-http-vertx:2.3.0'
+    implementation 'io.vertx:vertx-web:4.4.6'
+
+    testImplementation "org.apache.httpcomponents:httpclient"
+    compileOnly 'org.projectlombok:lombok'
+    annotationProcessor 'org.projectlombok:lombok'
+}
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-http/gradle.properties 
b/eventmesh-connectors/eventmesh-connector-http/gradle.properties
new file mode 100644
index 000000000..cc0e7324c
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/gradle.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+pluginType=connector
+pluginName=http
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/config/HttpServerConfig.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/config/HttpServerConfig.java
new file mode 100644
index 000000000..81a9f2092
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/config/HttpServerConfig.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.config;
+
+import org.apache.eventmesh.openconnect.api.config.Config;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class HttpServerConfig extends Config {
+
+    private boolean sourceEnable;
+
+    private boolean sinkEnable;
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java
new file mode 100644
index 000000000..bd94fed12
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.server;
+
+import org.apache.eventmesh.connector.http.config.HttpServerConfig;
+import 
org.apache.eventmesh.connector.http.source.connector.HttpSourceConnector;
+import org.apache.eventmesh.openconnect.Application;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+public class HttpConnectServer {
+
+    public static void main(String[] args) throws Exception {
+        HttpServerConfig serverConfig = 
ConfigUtil.parse(HttpServerConfig.class, "server-config.yml");
+
+        if (serverConfig.isSourceEnable()) {
+            Application httpSourceApp = new Application();
+            httpSourceApp.run(HttpSourceConnector.class);
+        }
+
+        if (serverConfig.isSinkEnable()) {
+            // TODO support sink connector
+        }
+    }
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/HttpSourceConfig.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/HttpSourceConfig.java
new file mode 100644
index 000000000..bee870cb1
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/HttpSourceConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.config;
+
+import org.apache.eventmesh.openconnect.api.config.SourceConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class HttpSourceConfig extends SourceConfig {
+
+    public SourceConnectorConfig connectorConfig;
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/SourceConnectorConfig.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/SourceConnectorConfig.java
new file mode 100644
index 000000000..873a0d192
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/SourceConnectorConfig.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.config;
+
+import lombok.Data;
+
+@Data
+public class SourceConnectorConfig {
+
+    private String connectorName;
+
+    private String path;
+
+    private int port;
+
+    private int idleTimeout;
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
new file mode 100644
index 000000000..b4b58f9f8
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.connector;
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.connector.http.source.config.HttpSourceConfig;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
+import org.apache.eventmesh.openconnect.api.source.Source;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.util.CloudEventUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.http.vertx.VertxMessageFactory;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.handler.LoggerHandler;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class HttpSourceConnector implements Source {
+
+    private static final int DEFAULT_BATCH_SIZE = 10;
+
+    private HttpSourceConfig sourceConfig;
+    private BlockingQueue<CloudEvent> queue;
+    private HttpServer server;
+
+    @Override
+    public Class<? extends Config> configClass() {
+        return HttpSourceConfig.class;
+    }
+
+    @Override
+    public void init(Config config) {
+        this.sourceConfig = (HttpSourceConfig) config;
+        doInit();
+    }
+
+    @Override
+    public void init(ConnectorContext connectorContext) {
+        SourceConnectorContext sourceConnectorContext = 
(SourceConnectorContext) connectorContext;
+        this.sourceConfig = (HttpSourceConfig) 
sourceConnectorContext.getSourceConfig();
+        doInit();
+    }
+
+    private void doInit() {
+        this.queue = new LinkedBlockingQueue<>(1000);
+
+        final Vertx vertx = Vertx.vertx();
+        final Router router = Router.router(vertx);
+        router.route()
+            .path(this.sourceConfig.connectorConfig.getPath())
+            .method(HttpMethod.POST)
+            .handler(LoggerHandler.create())
+            .handler(ctx -> {
+                VertxMessageFactory.createReader(ctx.request())
+                    .map(MessageReader::toEvent)
+                    .onSuccess(event -> {
+                        queue.add(event);
+                        log.info("[HttpSourceConnector] Succeed to convert 
payload into CloudEvent. StatusCode={}", HttpResponseStatus.OK.code());
+                        
ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end();
+                    })
+                    .onFailure(t -> {
+                        log.error("[HttpSourceConnector] Malformed request. 
StatusCode={}", HttpResponseStatus.BAD_REQUEST.code(), t);
+                        
ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).setStatusMessage(t.getMessage()).end();
+                    });
+            });
+        this.server = vertx.createHttpServer(new HttpServerOptions()
+            .setPort(this.sourceConfig.connectorConfig.getPort())
+            
.setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())).requestHandler(router);
+    }
+
+    @Override
+    public void start() {
+        Throwable t = this.server.listen().cause();
+        if (t != null) {
+            throw new EventMeshException("failed to start Vertx server", t);
+        }
+    }
+
+    @Override
+    public void commit(ConnectRecord record) {
+
+    }
+
+    @Override
+    public String name() {
+        return this.sourceConfig.getConnectorConfig().getConnectorName();
+    }
+
+    @Override
+    public void stop() {
+        Throwable t = this.server.close().cause();
+        if (t != null) {
+            throw new EventMeshException("failed to stop Vertx server", t);
+        }
+    }
+
+    @Override
+    public List<ConnectRecord> poll() {
+        List<ConnectRecord> connectRecords = new 
ArrayList<>(DEFAULT_BATCH_SIZE);
+        for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
+            try {
+                CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
+                if (event == null) {
+                    break;
+                }
+                connectRecords.add(CloudEventUtil.convertEventToRecord(event));
+            } catch (InterruptedException e) {
+                break;
+            }
+        }
+        return connectRecords;
+    }
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml
new file mode 100644
index 000000000..0cd7b5b5a
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sourceEnable: true
+sinkEnable: false
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
new file mode 100644
index 000000000..9fcc471d3
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+    meshAddress: 127.0.0.1:10000
+    subject: TopicTest
+    idc: FT
+    env: PRD
+    group: httpSource
+    appId: 5032
+    userName: httpSourceUser
+    passWord: httpPassWord
+connectorConfig:
+    connectorName: httpSource
+    path: /test
+    port: 3755
+    idleTimeout: 5
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
new file mode 100644
index 000000000..35d58b75c
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.connector;
+
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.connector.http.source.config.HttpSourceConfig;
+import org.apache.eventmesh.connector.http.source.config.SourceConnectorConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class HttpSourceConnectorTest {
+
+    private HttpSourceConnector connector;
+    private SourceConnectorConfig config;
+    private CloseableHttpClient httpClient;
+    private String uri;
+    private final String expectedMessage = "testHttpMessage";
+
+    @BeforeEach
+    void setUp() throws Exception {
+        connector = new HttpSourceConnector();
+        HttpSourceConfig sourceConfig = (HttpSourceConfig) 
ConfigUtil.parse(connector.configClass());
+        config = sourceConfig.getConnectorConfig();
+        connector.init(sourceConfig);
+        connector.start();
+
+        uri = new 
URIBuilder().setScheme("http").setHost("127.0.0.1").setPort(config.getPort()).setPath(config.getPath()).build().toString();
+
+        httpClient = HttpClients.createDefault();
+    }
+
+    @Test
+    void testPoll() throws Exception {
+        final int batchSize = 10;
+        // test binary content mode
+        for (int i = 0; i < batchSize; i++) {
+            HttpResponse resp = mockBinaryRequest();
+            Assertions.assertEquals(resp.getStatusLine().getStatusCode(), 
HttpStatus.SC_OK);
+
+        }
+        List<ConnectRecord> res = connector.poll();
+        Assertions.assertEquals(batchSize, res.size());
+        for (ConnectRecord r : res) {
+            Assertions.assertEquals(expectedMessage, new String((byte[]) 
r.getData()));
+        }
+
+        // test structured content mode
+        for (int i = 0; i < batchSize; i++) {
+            HttpResponse resp = mockStructuredRequest();
+            Assertions.assertEquals(resp.getStatusLine().getStatusCode(), 
HttpStatus.SC_OK);
+        }
+        res = connector.poll();
+        Assertions.assertEquals(batchSize, res.size());
+        for (ConnectRecord r : res) {
+            Assertions.assertEquals(expectedMessage, new String((byte[]) 
r.getData()));
+        }
+
+        // test invalid requests
+        HttpPost invalidPost = new HttpPost(uri);
+        invalidPost.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
+        invalidPost.setHeader("ce-id", String.valueOf(UUID.randomUUID()));
+        HttpResponse resp = httpClient.execute(invalidPost);
+        Assertions.assertEquals(HttpStatus.SC_BAD_REQUEST, 
resp.getStatusLine().getStatusCode());
+    }
+
+    HttpResponse mockBinaryRequest() throws Exception {
+        HttpPost httpPost = new HttpPost(uri);
+        httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
+        httpPost.setHeader("ce-id", String.valueOf(UUID.randomUUID()));
+        httpPost.setHeader("ce-specversion", "1.0");
+        httpPost.setHeader("ce-type", "com.example.someevent");
+        httpPost.setHeader("ce-source", "/mycontext");
+        httpPost.setHeader("ce-subject", "test");
+        httpPost.setEntity(new StringEntity(expectedMessage));
+
+        return httpClient.execute(httpPost);
+    }
+
+    HttpResponse mockStructuredRequest() throws Exception {
+        HttpPost httpPost = new HttpPost(uri);
+        // according to the CloudEvent specification, a json format event MUST 
use the media type `application/cloudevents+json`
+        httpPost.setHeader(HttpHeaders.CONTENT_TYPE, 
"application/cloudevents+json");
+        TestEvent event = new TestEvent();
+        event.id = String.valueOf(UUID.randomUUID());
+        event.specversion = "1.0";
+        event.type = "com.example.someevent";
+        event.source = "/mycontext";
+        event.subject = "test";
+        event.datacontenttype = "text/plain";
+        event.data = expectedMessage;
+        httpPost.setEntity(new StringEntity(JsonUtils.toJSONString(event)));
+
+        return httpClient.execute(httpPost);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        connector.stop();
+        httpClient.close();
+    }
+
+    class TestEvent {
+
+        public String specversion;
+        public String type;
+        public String source;
+        public String subject;
+        public String datacontenttype;
+        public String id;
+
+        public String data;
+    }
+}
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/server-config.yml
 
b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/server-config.yml
new file mode 100644
index 000000000..0cd7b5b5a
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/server-config.yml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sourceEnable: true
+sinkEnable: false
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
 
b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
new file mode 100644
index 000000000..0a3e68d07
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+    meshAddress: 127.0.0.1:10000
+    subject: TopicTest
+    idc: FT
+    env: PRD
+    group: httpSource
+    appId: 5032
+    userName: httpSourceUser
+    passWord: httpmqPassWord
+connectorConfig:
+    connectorName: httpSource
+    path: /test
+    port: 3755
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index ac5446664..2877fbd39 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -49,6 +49,7 @@ include 'eventmesh-connectors:eventmesh-connector-lark'
 include 'eventmesh-connectors:eventmesh-connector-wecom'
 include 'eventmesh-connectors:eventmesh-connector-slack'
 include 'eventmesh-connectors:eventmesh-connector-wechat'
+include 'eventmesh-connectors:eventmesh-connector-http'
 
 include 'eventmesh-storage-plugin:eventmesh-storage-api'
 include 'eventmesh-storage-plugin:eventmesh-storage-standalone'
diff --git a/tools/dependency-check/known-dependencies.txt 
b/tools/dependency-check/known-dependencies.txt
index 7ccc8115b..cb774041c 100644
--- a/tools/dependency-check/known-dependencies.txt
+++ b/tools/dependency-check/known-dependencies.txt
@@ -25,6 +25,7 @@ cache-api-1.1.1.jar
 checker-qual-3.12.0.jar
 cloudevents-api-2.4.2.jar
 cloudevents-core-2.4.2.jar
+cloudevents-http-vertx-2.3.0.jar
 cloudevents-json-jackson-2.4.2.jar
 cloudevents-kafka-2.4.2.jar
 cloudevents-protobuf-2.4.2.jar
@@ -64,6 +65,7 @@ grpc-protobuf-lite-1.43.2.jar
 grpc-stub-1.43.2.jar
 gson-2.8.2.jar
 guava-31.0.1-jre.jar
+guava-retrying-2.0.0.jar
 guice-4.2.2.jar
 httpasyncclient-4.1.3.jar
 httpclient-4.5.13.jar
@@ -117,24 +119,35 @@ nacos-client-2.2.1.jar
 nacos-encryption-plugin-2.2.1.jar
 netty-3.10.6.Final.jar
 netty-all-4.1.79.Final.jar
+netty-buffer-4.1.100.Final.jar
 netty-buffer-4.1.79.Final.jar
+netty-codec-4.1.100.Final.jar
 netty-codec-4.1.79.Final.jar
+netty-codec-dns-4.1.100.Final.jar
 netty-codec-dns-4.1.79.Final.jar
 netty-codec-haproxy-4.1.79.Final.jar
-netty-codec-http-4.1.79.Final.jar
+netty-codec-http2-4.1.100.Final.jar
 netty-codec-http2-4.1.79.Final.jar
+netty-codec-http-4.1.100.Final.jar
+netty-codec-http-4.1.79.Final.jar
 netty-codec-memcache-4.1.79.Final.jar
 netty-codec-mqtt-4.1.79.Final.jar
 netty-codec-redis-4.1.79.Final.jar
 netty-codec-smtp-4.1.79.Final.jar
+netty-codec-socks-4.1.100.Final.jar
 netty-codec-socks-4.1.79.Final.jar
 netty-codec-stomp-4.1.79.Final.jar
 netty-codec-xml-4.1.79.Final.jar
+netty-common-4.1.100.Final.jar
 netty-common-4.1.79.Final.jar
+netty-handler-4.1.100.Final.jar
 netty-handler-4.1.79.Final.jar
+netty-handler-proxy-4.1.100.Final.jar
 netty-handler-proxy-4.1.79.Final.jar
 netty-reactive-streams-2.0.4.jar
+netty-resolver-4.1.100.Final.jar
 netty-resolver-4.1.79.Final.jar
+netty-resolver-dns-4.1.100.Final.jar
 netty-resolver-dns-4.1.79.Final.jar
 netty-resolver-dns-classes-macos-4.1.79.Final.jar
 netty-resolver-dns-native-macos-4.1.79.Final-osx-aarch_64.jar
@@ -143,14 +156,16 @@ netty-tcnative-boringssl-static-2.0.48.Final.jar
 netty-tcnative-boringssl-static-2.0.51.Final.jar
 netty-tcnative-classes-2.0.48.Final.jar
 netty-tcnative-classes-2.0.51.Final.jar
+netty-transport-4.1.100.Final.jar
 netty-transport-4.1.79.Final.jar
 netty-transport-classes-epoll-4.1.79.Final.jar
 netty-transport-classes-kqueue-4.1.79.Final.jar
+netty-transport-native-epoll-4.1.79.Final.jar
 netty-transport-native-epoll-4.1.79.Final-linux-aarch_64.jar
 netty-transport-native-epoll-4.1.79.Final-linux-x86_64.jar
-netty-transport-native-epoll-4.1.79.Final.jar
 netty-transport-native-kqueue-4.1.79.Final-osx-aarch_64.jar
 netty-transport-native-kqueue-4.1.79.Final-osx-x86_64.jar
+netty-transport-native-unix-common-4.1.100.Final.jar
 netty-transport-native-unix-common-4.1.79.Final.jar
 netty-transport-rxtx-4.1.79.Final.jar
 netty-transport-sctp-4.1.79.Final.jar
@@ -185,13 +200,13 @@ pravega-shared-authplugin-0.11.0.jar
 pravega-shared-controller-api-0.11.0.jar
 pravega-shared-protocol-0.11.0.jar
 pravega-shared-security-0.11.0.jar
-proto-google-common-protos-2.0.1.jar
 protobuf-java-3.19.2.jar
 protobuf-java-3.21.5.jar
 protobuf-java-util-3.15.0.jar
 protobuf-java-util-3.17.2.jar
 protobuf-java-util-3.21.5.jar
 protobuf-java-util-3.5.1.jar
+proto-google-common-protos-2.0.1.jar
 pulsar-client-2.10.1.jar
 pulsar-client-admin-api-2.10.1.jar
 pulsar-client-api-2.10.1.jar
@@ -217,6 +232,12 @@ slf4j-api-1.7.30.jar
 snakeyaml-1.30.jar
 snappy-java-1.1.8.1.jar
 validation-api-1.1.0.Final.jar
+vertx-auth-common-4.4.6.jar
+vertx-bridge-common-4.4.6.jar
+vertx-core-4.4.6.jar
+vertx-web-4.4.6.jar
+vertx-web-client-4.0.0.jar
+vertx-web-common-4.4.6.jar
 zipkin-2.23.2.jar
 zipkin-reporter-2.16.3.jar
 zipkin-sender-okhttp3-2.16.3.jar
@@ -224,4 +245,3 @@ zookeeper-3.7.1.jar
 zookeeper-jute-3.7.1.jar
 zstd-jni-1.5.0-2.jar
 zstd-jni-1.5.2-2.jar
-guava-retrying-2.0.0.jar
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to