This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new a53196887 [ISSUE #4618] Add HTTP source connector (#4634)
a53196887 is described below
commit a53196887771e338a0356cc1631bfb0150bd1c30
Author: Fungx <[email protected]>
AuthorDate: Tue Dec 12 15:23:27 2023 +0800
[ISSUE #4618] Add HTTP source connector (#4634)
* [ISSUE #4618] Add HTTP source connector
* fix exception & config
---
eventmesh-connectors/README.md | 2 +-
.../eventmesh-connector-http/build.gradle | 27 ++++
.../eventmesh-connector-http/gradle.properties | 18 +++
.../connector/http/config/HttpServerConfig.java | 32 +++++
.../connector/http/server/HttpConnectServer.java | 40 ++++++
.../http/source/config/HttpSourceConfig.java | 30 +++++
.../http/source/config/SourceConnectorConfig.java | 32 +++++
.../http/source/connector/HttpSourceConnector.java | 145 +++++++++++++++++++++
.../src/main/resources/server-config.yml | 19 +++
.../src/main/resources/source-config.yml | 31 +++++
.../source/connector/HttpSourceConnectorTest.java | 145 +++++++++++++++++++++
.../src/test/resources/server-config.yml | 19 +++
.../src/test/resources/source-config.yml | 30 +++++
settings.gradle | 1 +
tools/dependency-check/known-dependencies.txt | 28 +++-
15 files changed, 594 insertions(+), 5 deletions(-)
diff --git a/eventmesh-connectors/README.md b/eventmesh-connectors/README.md
index ccd1e7dd0..fad80095b 100644
--- a/eventmesh-connectors/README.md
+++ b/eventmesh-connectors/README.md
@@ -36,7 +36,7 @@ Add a new connector by implementing the source/sink interface
using :
| [File](eventmesh-connector-file) | Sink | ✅ |
| Github | Source | ⬜ |
| Github | Sink | ⬜ |
-| Http | Source | ⬜ |
+| [Http](eventmesh-connector-http) | Source | ✅ |
| Http | Sink | ⬜ |
| Jdbc | Source | ⬜ |
| [Jdbc](eventmesh-connector-jdbc) | Sink | ✅ |
diff --git a/eventmesh-connectors/eventmesh-connector-http/build.gradle
b/eventmesh-connectors/eventmesh-connector-http/build.gradle
new file mode 100644
index 000000000..734b2fc62
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/build.gradle
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+dependencies {
+ api project(":eventmesh-openconnect:eventmesh-openconnect-java")
+ implementation project(":eventmesh-common")
+ implementation 'io.cloudevents:cloudevents-http-vertx:2.3.0'
+ implementation 'io.vertx:vertx-web:4.4.6'
+
+ testImplementation "org.apache.httpcomponents:httpclient"
+ compileOnly 'org.projectlombok:lombok'
+ annotationProcessor 'org.projectlombok:lombok'
+}
\ No newline at end of file
diff --git a/eventmesh-connectors/eventmesh-connector-http/gradle.properties
b/eventmesh-connectors/eventmesh-connector-http/gradle.properties
new file mode 100644
index 000000000..cc0e7324c
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-http/gradle.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+pluginType=connector
+pluginName=http
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/config/HttpServerConfig.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/config/HttpServerConfig.java
new file mode 100644
index 000000000..81a9f2092
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/config/HttpServerConfig.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.config;
+
+import org.apache.eventmesh.openconnect.api.config.Config;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class HttpServerConfig extends Config {
+
+ private boolean sourceEnable;
+
+ private boolean sinkEnable;
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java
new file mode 100644
index 000000000..bd94fed12
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/server/HttpConnectServer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.server;
+
+import org.apache.eventmesh.connector.http.config.HttpServerConfig;
+import
org.apache.eventmesh.connector.http.source.connector.HttpSourceConnector;
+import org.apache.eventmesh.openconnect.Application;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+public class HttpConnectServer {
+
+ public static void main(String[] args) throws Exception {
+ HttpServerConfig serverConfig =
ConfigUtil.parse(HttpServerConfig.class, "server-config.yml");
+
+ if (serverConfig.isSourceEnable()) {
+ Application httpSourceApp = new Application();
+ httpSourceApp.run(HttpSourceConnector.class);
+ }
+
+ if (serverConfig.isSinkEnable()) {
+ // TODO support sink connector
+ }
+ }
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/HttpSourceConfig.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/HttpSourceConfig.java
new file mode 100644
index 000000000..bee870cb1
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/HttpSourceConfig.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.config;
+
+import org.apache.eventmesh.openconnect.api.config.SourceConfig;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class HttpSourceConfig extends SourceConfig {
+
+ public SourceConnectorConfig connectorConfig;
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/SourceConnectorConfig.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/SourceConnectorConfig.java
new file mode 100644
index 000000000..873a0d192
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/config/SourceConnectorConfig.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.config;
+
+import lombok.Data;
+
+@Data
+public class SourceConnectorConfig {
+
+ private String connectorName;
+
+ private String path;
+
+ private int port;
+
+ private int idleTimeout;
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
new file mode 100644
index 000000000..b4b58f9f8
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.connector;
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.connector.http.source.config.HttpSourceConfig;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
+import org.apache.eventmesh.openconnect.api.source.Source;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.util.CloudEventUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.message.MessageReader;
+import io.cloudevents.http.vertx.VertxMessageFactory;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.handler.LoggerHandler;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class HttpSourceConnector implements Source {
+
+ private static final int DEFAULT_BATCH_SIZE = 10;
+
+ private HttpSourceConfig sourceConfig;
+ private BlockingQueue<CloudEvent> queue;
+ private HttpServer server;
+
+ @Override
+ public Class<? extends Config> configClass() {
+ return HttpSourceConfig.class;
+ }
+
+ @Override
+ public void init(Config config) {
+ this.sourceConfig = (HttpSourceConfig) config;
+ doInit();
+ }
+
+ @Override
+ public void init(ConnectorContext connectorContext) {
+ SourceConnectorContext sourceConnectorContext =
(SourceConnectorContext) connectorContext;
+ this.sourceConfig = (HttpSourceConfig)
sourceConnectorContext.getSourceConfig();
+ doInit();
+ }
+
+ private void doInit() {
+ this.queue = new LinkedBlockingQueue<>(1000);
+
+ final Vertx vertx = Vertx.vertx();
+ final Router router = Router.router(vertx);
+ router.route()
+ .path(this.sourceConfig.connectorConfig.getPath())
+ .method(HttpMethod.POST)
+ .handler(LoggerHandler.create())
+ .handler(ctx -> {
+ VertxMessageFactory.createReader(ctx.request())
+ .map(MessageReader::toEvent)
+ .onSuccess(event -> {
+ queue.add(event);
+ log.info("[HttpSourceConnector] Succeed to convert
payload into CloudEvent. StatusCode={}", HttpResponseStatus.OK.code());
+
ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end();
+ })
+ .onFailure(t -> {
+ log.error("[HttpSourceConnector] Malformed request.
StatusCode={}", HttpResponseStatus.BAD_REQUEST.code(), t);
+
ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).setStatusMessage(t.getMessage()).end();
+ });
+ });
+ this.server = vertx.createHttpServer(new HttpServerOptions()
+ .setPort(this.sourceConfig.connectorConfig.getPort())
+
.setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())).requestHandler(router);
+ }
+
+ @Override
+ public void start() {
+ Throwable t = this.server.listen().cause();
+ if (t != null) {
+ throw new EventMeshException("failed to start Vertx server", t);
+ }
+ }
+
+ @Override
+ public void commit(ConnectRecord record) {
+
+ }
+
+ @Override
+ public String name() {
+ return this.sourceConfig.getConnectorConfig().getConnectorName();
+ }
+
+ @Override
+ public void stop() {
+ Throwable t = this.server.close().cause();
+ if (t != null) {
+ throw new EventMeshException("failed to stop Vertx server", t);
+ }
+ }
+
+ @Override
+ public List<ConnectRecord> poll() {
+ List<ConnectRecord> connectRecords = new
ArrayList<>(DEFAULT_BATCH_SIZE);
+ for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
+ try {
+ CloudEvent event = queue.poll(3, TimeUnit.SECONDS);
+ if (event == null) {
+ break;
+ }
+ connectRecords.add(CloudEventUtil.convertEventToRecord(event));
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ return connectRecords;
+ }
+
+}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml
new file mode 100644
index 000000000..0cd7b5b5a
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/server-config.yml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sourceEnable: true
+sinkEnable: false
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
new file mode 100644
index 000000000..9fcc471d3
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/resources/source-config.yml
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TopicTest
+ idc: FT
+ env: PRD
+ group: httpSource
+ appId: 5032
+ userName: httpSourceUser
+ passWord: httpPassWord
+connectorConfig:
+ connectorName: httpSource
+ path: /test
+ port: 3755
+ idleTimeout: 5
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
new file mode 100644
index 000000000..35d58b75c
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.source.connector;
+
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.connector.http.source.config.HttpSourceConfig;
+import org.apache.eventmesh.connector.http.source.config.SourceConnectorConfig;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.openconnect.util.ConfigUtil;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class HttpSourceConnectorTest {
+
+ private HttpSourceConnector connector;
+ private SourceConnectorConfig config;
+ private CloseableHttpClient httpClient;
+ private String uri;
+ private final String expectedMessage = "testHttpMessage";
+
+ @BeforeEach
+ void setUp() throws Exception {
+ connector = new HttpSourceConnector();
+ HttpSourceConfig sourceConfig = (HttpSourceConfig)
ConfigUtil.parse(connector.configClass());
+ config = sourceConfig.getConnectorConfig();
+ connector.init(sourceConfig);
+ connector.start();
+
+ uri = new
URIBuilder().setScheme("http").setHost("127.0.0.1").setPort(config.getPort()).setPath(config.getPath()).build().toString();
+
+ httpClient = HttpClients.createDefault();
+ }
+
+ @Test
+ void testPoll() throws Exception {
+ final int batchSize = 10;
+ // test binary content mode
+ for (int i = 0; i < batchSize; i++) {
+ HttpResponse resp = mockBinaryRequest();
+ Assertions.assertEquals(resp.getStatusLine().getStatusCode(),
HttpStatus.SC_OK);
+
+ }
+ List<ConnectRecord> res = connector.poll();
+ Assertions.assertEquals(batchSize, res.size());
+ for (ConnectRecord r : res) {
+ Assertions.assertEquals(expectedMessage, new String((byte[])
r.getData()));
+ }
+
+ // test structured content mode
+ for (int i = 0; i < batchSize; i++) {
+ HttpResponse resp = mockStructuredRequest();
+ Assertions.assertEquals(resp.getStatusLine().getStatusCode(),
HttpStatus.SC_OK);
+ }
+ res = connector.poll();
+ Assertions.assertEquals(batchSize, res.size());
+ for (ConnectRecord r : res) {
+ Assertions.assertEquals(expectedMessage, new String((byte[])
r.getData()));
+ }
+
+ // test invalid requests
+ HttpPost invalidPost = new HttpPost(uri);
+ invalidPost.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
+ invalidPost.setHeader("ce-id", String.valueOf(UUID.randomUUID()));
+ HttpResponse resp = httpClient.execute(invalidPost);
+ Assertions.assertEquals(HttpStatus.SC_BAD_REQUEST,
resp.getStatusLine().getStatusCode());
+ }
+
+ HttpResponse mockBinaryRequest() throws Exception {
+ HttpPost httpPost = new HttpPost(uri);
+ httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "text/plain");
+ httpPost.setHeader("ce-id", String.valueOf(UUID.randomUUID()));
+ httpPost.setHeader("ce-specversion", "1.0");
+ httpPost.setHeader("ce-type", "com.example.someevent");
+ httpPost.setHeader("ce-source", "/mycontext");
+ httpPost.setHeader("ce-subject", "test");
+ httpPost.setEntity(new StringEntity(expectedMessage));
+
+ return httpClient.execute(httpPost);
+ }
+
+ HttpResponse mockStructuredRequest() throws Exception {
+ HttpPost httpPost = new HttpPost(uri);
+ // according to the CloudEvent specification, a json format event MUST
use the media type `application/cloudevents+json`
+ httpPost.setHeader(HttpHeaders.CONTENT_TYPE,
"application/cloudevents+json");
+ TestEvent event = new TestEvent();
+ event.id = String.valueOf(UUID.randomUUID());
+ event.specversion = "1.0";
+ event.type = "com.example.someevent";
+ event.source = "/mycontext";
+ event.subject = "test";
+ event.datacontenttype = "text/plain";
+ event.data = expectedMessage;
+ httpPost.setEntity(new StringEntity(JsonUtils.toJSONString(event)));
+
+ return httpClient.execute(httpPost);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ connector.stop();
+ httpClient.close();
+ }
+
+ class TestEvent {
+
+ public String specversion;
+ public String type;
+ public String source;
+ public String subject;
+ public String datacontenttype;
+ public String id;
+
+ public String data;
+ }
+}
\ No newline at end of file
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/server-config.yml
b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/server-config.yml
new file mode 100644
index 000000000..0cd7b5b5a
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/server-config.yml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+sourceEnable: true
+sinkEnable: false
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
new file mode 100644
index 000000000..0a3e68d07
--- /dev/null
+++
b/eventmesh-connectors/eventmesh-connector-http/src/test/resources/source-config.yml
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+ meshAddress: 127.0.0.1:10000
+ subject: TopicTest
+ idc: FT
+ env: PRD
+ group: httpSource
+ appId: 5032
+ userName: httpSourceUser
+ passWord: httpmqPassWord
+connectorConfig:
+ connectorName: httpSource
+ path: /test
+ port: 3755
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index ac5446664..2877fbd39 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -49,6 +49,7 @@ include 'eventmesh-connectors:eventmesh-connector-lark'
include 'eventmesh-connectors:eventmesh-connector-wecom'
include 'eventmesh-connectors:eventmesh-connector-slack'
include 'eventmesh-connectors:eventmesh-connector-wechat'
+include 'eventmesh-connectors:eventmesh-connector-http'
include 'eventmesh-storage-plugin:eventmesh-storage-api'
include 'eventmesh-storage-plugin:eventmesh-storage-standalone'
diff --git a/tools/dependency-check/known-dependencies.txt
b/tools/dependency-check/known-dependencies.txt
index 7ccc8115b..cb774041c 100644
--- a/tools/dependency-check/known-dependencies.txt
+++ b/tools/dependency-check/known-dependencies.txt
@@ -25,6 +25,7 @@ cache-api-1.1.1.jar
checker-qual-3.12.0.jar
cloudevents-api-2.4.2.jar
cloudevents-core-2.4.2.jar
+cloudevents-http-vertx-2.3.0.jar
cloudevents-json-jackson-2.4.2.jar
cloudevents-kafka-2.4.2.jar
cloudevents-protobuf-2.4.2.jar
@@ -64,6 +65,7 @@ grpc-protobuf-lite-1.43.2.jar
grpc-stub-1.43.2.jar
gson-2.8.2.jar
guava-31.0.1-jre.jar
+guava-retrying-2.0.0.jar
guice-4.2.2.jar
httpasyncclient-4.1.3.jar
httpclient-4.5.13.jar
@@ -117,24 +119,35 @@ nacos-client-2.2.1.jar
nacos-encryption-plugin-2.2.1.jar
netty-3.10.6.Final.jar
netty-all-4.1.79.Final.jar
+netty-buffer-4.1.100.Final.jar
netty-buffer-4.1.79.Final.jar
+netty-codec-4.1.100.Final.jar
netty-codec-4.1.79.Final.jar
+netty-codec-dns-4.1.100.Final.jar
netty-codec-dns-4.1.79.Final.jar
netty-codec-haproxy-4.1.79.Final.jar
-netty-codec-http-4.1.79.Final.jar
+netty-codec-http2-4.1.100.Final.jar
netty-codec-http2-4.1.79.Final.jar
+netty-codec-http-4.1.100.Final.jar
+netty-codec-http-4.1.79.Final.jar
netty-codec-memcache-4.1.79.Final.jar
netty-codec-mqtt-4.1.79.Final.jar
netty-codec-redis-4.1.79.Final.jar
netty-codec-smtp-4.1.79.Final.jar
+netty-codec-socks-4.1.100.Final.jar
netty-codec-socks-4.1.79.Final.jar
netty-codec-stomp-4.1.79.Final.jar
netty-codec-xml-4.1.79.Final.jar
+netty-common-4.1.100.Final.jar
netty-common-4.1.79.Final.jar
+netty-handler-4.1.100.Final.jar
netty-handler-4.1.79.Final.jar
+netty-handler-proxy-4.1.100.Final.jar
netty-handler-proxy-4.1.79.Final.jar
netty-reactive-streams-2.0.4.jar
+netty-resolver-4.1.100.Final.jar
netty-resolver-4.1.79.Final.jar
+netty-resolver-dns-4.1.100.Final.jar
netty-resolver-dns-4.1.79.Final.jar
netty-resolver-dns-classes-macos-4.1.79.Final.jar
netty-resolver-dns-native-macos-4.1.79.Final-osx-aarch_64.jar
@@ -143,14 +156,16 @@ netty-tcnative-boringssl-static-2.0.48.Final.jar
netty-tcnative-boringssl-static-2.0.51.Final.jar
netty-tcnative-classes-2.0.48.Final.jar
netty-tcnative-classes-2.0.51.Final.jar
+netty-transport-4.1.100.Final.jar
netty-transport-4.1.79.Final.jar
netty-transport-classes-epoll-4.1.79.Final.jar
netty-transport-classes-kqueue-4.1.79.Final.jar
+netty-transport-native-epoll-4.1.79.Final.jar
netty-transport-native-epoll-4.1.79.Final-linux-aarch_64.jar
netty-transport-native-epoll-4.1.79.Final-linux-x86_64.jar
-netty-transport-native-epoll-4.1.79.Final.jar
netty-transport-native-kqueue-4.1.79.Final-osx-aarch_64.jar
netty-transport-native-kqueue-4.1.79.Final-osx-x86_64.jar
+netty-transport-native-unix-common-4.1.100.Final.jar
netty-transport-native-unix-common-4.1.79.Final.jar
netty-transport-rxtx-4.1.79.Final.jar
netty-transport-sctp-4.1.79.Final.jar
@@ -185,13 +200,13 @@ pravega-shared-authplugin-0.11.0.jar
pravega-shared-controller-api-0.11.0.jar
pravega-shared-protocol-0.11.0.jar
pravega-shared-security-0.11.0.jar
-proto-google-common-protos-2.0.1.jar
protobuf-java-3.19.2.jar
protobuf-java-3.21.5.jar
protobuf-java-util-3.15.0.jar
protobuf-java-util-3.17.2.jar
protobuf-java-util-3.21.5.jar
protobuf-java-util-3.5.1.jar
+proto-google-common-protos-2.0.1.jar
pulsar-client-2.10.1.jar
pulsar-client-admin-api-2.10.1.jar
pulsar-client-api-2.10.1.jar
@@ -217,6 +232,12 @@ slf4j-api-1.7.30.jar
snakeyaml-1.30.jar
snappy-java-1.1.8.1.jar
validation-api-1.1.0.Final.jar
+vertx-auth-common-4.4.6.jar
+vertx-bridge-common-4.4.6.jar
+vertx-core-4.4.6.jar
+vertx-web-4.4.6.jar
+vertx-web-client-4.0.0.jar
+vertx-web-common-4.4.6.jar
zipkin-2.23.2.jar
zipkin-reporter-2.16.3.jar
zipkin-sender-okhttp3-2.16.3.jar
@@ -224,4 +245,3 @@ zookeeper-3.7.1.jar
zookeeper-jute-3.7.1.jar
zstd-jni-1.5.0-2.jar
zstd-jni-1.5.2-2.jar
-guava-retrying-2.0.0.jar
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]