This is an automated email from the ASF dual-hosted git repository.
mxsm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new 3436e5629 [ISSUE #5020] Optimize unit tests and code (#5023)
3436e5629 is described below
commit 3436e5629a1646b5155d4b8d782edbf52e288c5b
Author: Zaki <[email protected]>
AuthorDate: Tue Jul 9 13:48:05 2024 +0800
[ISSUE #5020] Optimize unit tests and code (#5023)
---
.../eventmesh-connector-http/build.gradle | 4 +-
.../http/sink/handle/CommonHttpSinkHandler.java | 23 ++-
.../http/sink/handle/WebhookHttpSinkHandler.java | 38 ++++-
.../http/source/connector/HttpSourceConnector.java | 41 ++++-
.../source/connector/HttpSinkConnectorTest.java | 110 ++++++------
.../source/connector/HttpSourceConnectorTest.java | 185 +++++++++++----------
6 files changed, 238 insertions(+), 163 deletions(-)
diff --git a/eventmesh-connectors/eventmesh-connector-http/build.gradle
b/eventmesh-connectors/eventmesh-connector-http/build.gradle
index b64f7903b..cfc69259d 100644
--- a/eventmesh-connectors/eventmesh-connector-http/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-http/build.gradle
@@ -24,8 +24,10 @@ dependencies {
implementation 'io.vertx:vertx-web-client:4.5.8'
implementation 'dev.failsafe:failsafe:3.3.2'
+
+ testImplementation 'org.apache.httpcomponents.client5:httpclient5:5.3.1'
+ testImplementation
'org.apache.httpcomponents.client5:httpclient5-fluent:5.3.1'
testImplementation 'org.mock-server:mockserver-netty:5.15.0'
- testImplementation 'com.squareup.okhttp3:okhttp:4.12.0'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
index 7eeba88d6..c6cc90e0e 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
@@ -133,7 +133,14 @@ public class CommonHttpSinkHandler implements
HttpSinkHandler {
// get timestamp and offset
Long timestamp = httpConnectRecord.getData().getTimestamp();
- Map<String, ?> offset = ((HttpRecordOffset)
httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
+ Map<String, ?> offset = null;
+ try {
+ // May throw NullPointerException.
+ offset = ((HttpRecordOffset)
httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
+ } catch (NullPointerException e) {
+ // ignore null pointer exception
+ }
+ final Map<String, ?> finalOffset = offset;
// send the request
return this.webClient.post(url.getPath())
@@ -143,26 +150,28 @@ public class CommonHttpSinkHandler implements
HttpSinkHandler {
.ssl(Objects.equals(url.getScheme(), "https"))
.sendJson(httpConnectRecord)
.onSuccess(res -> {
- log.info("Request sent successfully. Record: timestamp={},
offset={}", timestamp, offset);
+ log.info("Request sent successfully. Record: timestamp={},
offset={}", timestamp, finalOffset);
// log the response
if (HttpUtils.is2xxSuccessful(res.statusCode())) {
if (log.isDebugEnabled()) {
log.debug("Received successful response:
statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
- res.statusCode(), timestamp, offset,
res.bodyAsString());
+ res.statusCode(), timestamp, finalOffset,
res.bodyAsString());
} else {
- log.info("Received successful response: statusCode={}.
Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset);
+ log.info("Received successful response: statusCode={}.
Record: timestamp={}, offset={}", res.statusCode(), timestamp,
+ finalOffset);
}
} else {
if (log.isDebugEnabled()) {
log.warn("Received non-2xx response: statusCode={}.
Record: timestamp={}, offset={}, responseBody={}",
- res.statusCode(), timestamp, offset,
res.bodyAsString());
+ res.statusCode(), timestamp, finalOffset,
res.bodyAsString());
} else {
- log.warn("Received non-2xx response: statusCode={}.
Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset);
+ log.warn("Received non-2xx response: statusCode={}.
Record: timestamp={}, offset={}", res.statusCode(), timestamp,
+ finalOffset);
}
}
})
- .onFailure(err -> log.error("Request failed to send. Record:
timestamp={}, offset={}", timestamp, offset, err));
+ .onFailure(err -> log.error("Request failed to send. Record:
timestamp={}, offset={}", timestamp, finalOffset, err));
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
index 9af246bc6..4e64126a9 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
@@ -72,6 +72,18 @@ public class WebhookHttpSinkHandler extends
CommonHttpSinkHandler {
// store the received data, when webhook is enabled
private final SynchronizedCircularFifoQueue<HttpExportRecord>
receivedDataQueue;
+ private volatile boolean exportStarted = false;
+
+ private volatile boolean exportDestroyed = false;
+
+ public boolean isExportStarted() {
+ return exportStarted;
+ }
+
+ public boolean isExportDestroyed() {
+ return exportDestroyed;
+ }
+
public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
super(sinkConnectorConfig);
this.sinkConnectorConfig = sinkConnectorConfig;
@@ -179,10 +191,15 @@ public class WebhookHttpSinkHandler extends
CommonHttpSinkHandler {
// start the webclient
super.start();
// start the export server
- Throwable t = this.exportServer.listen().cause();
- if (t != null) {
- throw new EventMeshException("Failed to start Vertx server. ", t);
- }
+ this.exportServer.listen(res -> {
+ if (res.succeeded()) {
+ this.exportStarted = true;
+ log.info("WebhookHttpExportServer started on port: {}",
this.webhookConfig.getPort());
+ } else {
+ log.error("WebhookHttpExportServer failed to start on port:
{}", this.webhookConfig.getPort());
+ throw new EventMeshException("Failed to start Vertx server. ",
res.cause());
+ }
+ });
}
/**
@@ -250,10 +267,15 @@ public class WebhookHttpSinkHandler extends
CommonHttpSinkHandler {
super.stop();
// stop the export server
if (this.exportServer != null) {
- Throwable t = this.exportServer.close().cause();
- if (t != null) {
- throw new EventMeshException("Failed to stop Vertx server. ",
t);
- }
+ this.exportServer.close(res -> {
+ if (res.succeeded()) {
+ this.exportDestroyed = true;
+ log.info("WebhookHttpExportServer stopped on port: {}",
this.webhookConfig.getPort());
+ } else {
+ log.error("WebhookHttpExportServer failed to stop on port:
{}", this.webhookConfig.getPort());
+ throw new EventMeshException("Failed to stop Vertx server.
", res.cause());
+ }
+ });
} else {
log.warn("Callback server is null, ignore.");
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
index b976fed9b..c59915b20 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
@@ -55,6 +55,18 @@ public class HttpSourceConnector implements Source {
private HttpServer server;
+ private volatile boolean started = false;
+
+ private volatile boolean destroyed = false;
+
+ public boolean isStarted() {
+ return started;
+ }
+
+ public boolean isDestroyed() {
+ return destroyed;
+ }
+
@Override
public Class<? extends Config> configClass() {
@@ -105,10 +117,15 @@ public class HttpSourceConnector implements Source {
@Override
public void start() {
- Throwable t = this.server.listen().cause();
- if (t != null) {
- throw new EventMeshException("failed to start Vertx server", t);
- }
+ this.server.listen(res -> {
+ if (res.succeeded()) {
+ this.started = true;
+ log.info("HttpSourceConnector started on port: {}",
this.sourceConfig.getConnectorConfig().getPort());
+ } else {
+ log.error("HttpSourceConnector failed to start on port: {}",
this.sourceConfig.getConnectorConfig().getPort());
+ throw new EventMeshException("failed to start Vertx server",
res.cause());
+ }
+ });
}
@Override
@@ -123,9 +140,19 @@ public class HttpSourceConnector implements Source {
@Override
public void stop() {
- Throwable t = this.server.close().cause();
- if (t != null) {
- throw new EventMeshException("failed to stop Vertx server", t);
+ if (this.server != null) {
+ this.server.close(res -> {
+ if (res.succeeded()) {
+ this.destroyed = true;
+ log.info("HttpSourceConnector stopped on port: {}",
this.sourceConfig.getConnectorConfig().getPort());
+ } else {
+ log.error("HttpSourceConnector failed to stop on port:
{}", this.sourceConfig.getConnectorConfig().getPort());
+ throw new EventMeshException("failed to stop Vertx
server", res.cause());
+ }
+ }
+ );
+ } else {
+ log.warn("HttpSourceConnector server is null, ignore.");
}
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
index eeba625b0..778d963b5 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.connector.http.source.connector;
+
import static org.mockserver.model.HttpRequest.request;
import org.apache.eventmesh.connector.http.sink.HttpSinkConnector;
@@ -25,28 +26,30 @@ import
org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.util.ConfigUtil;
+import org.apache.hc.client5.http.fluent.Request;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.net.URIBuilder;
+
import java.net.URI;
+import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockserver.integration.ClientAndServer;
-import org.mockserver.model.HttpRequest;
import org.mockserver.model.HttpResponse;
import org.mockserver.model.MediaType;
+
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
-import okhttp3.HttpUrl;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.Response;
-import okhttp3.ResponseBody;
public class HttpSinkConnectorTest {
@@ -54,30 +57,33 @@ public class HttpSinkConnectorTest {
private HttpSinkConfig sinkConfig;
- private URI severUri;
+ private URL url;
private ClientAndServer mockServer;
+ private static final AtomicInteger counter = new AtomicInteger(0);
@BeforeEach
void before() throws Exception {
// init sinkConnector
- this.sinkConnector = new HttpSinkConnector();
- this.sinkConfig = (HttpSinkConfig)
ConfigUtil.parse(sinkConnector.configClass());
- this.sinkConnector.init(this.sinkConfig);
- this.sinkConnector.start();
+ sinkConnector = new HttpSinkConnector();
+ sinkConfig = (HttpSinkConfig)
ConfigUtil.parse(sinkConnector.configClass());
+ sinkConnector.init(this.sinkConfig);
+ sinkConnector.start();
- this.severUri = URI.create(sinkConfig.connectorConfig.getUrls()[0]);
+ url = new URL(sinkConfig.connectorConfig.getUrls()[0]);
// start mockServer
- mockServer = ClientAndServer.startClientAndServer(severUri.getPort());
+ mockServer = ClientAndServer.startClientAndServer(url.getPort());
mockServer.reset()
.when(
request()
.withMethod("POST")
- .withPath(severUri.getPath())
+ .withPath(url.getPath())
)
.respond(
httpRequest -> {
+ // Increase the number of requests received
+ counter.incrementAndGet();
JSONObject requestBody =
JSON.parseObject(httpRequest.getBodyAsString());
return HttpResponse.response()
.withContentType(MediaType.APPLICATION_JSON)
@@ -90,6 +96,7 @@ public class HttpSinkConnectorTest {
); // .withDelay(TimeUnit.SECONDS, 10);
}
);
+
}
@AfterEach
@@ -101,62 +108,57 @@ public class HttpSinkConnectorTest {
@Test
void testPut() throws Exception {
// Create a list of ConnectRecord
- final int times = 10;
+ final int size = 10;
List<ConnectRecord> connectRecords = new ArrayList<>();
- for (int i = 0; i < times; i++) {
+ for (int i = 0; i < size; i++) {
ConnectRecord record = createConnectRecord();
connectRecords.add(record);
}
// Put ConnectRecord
sinkConnector.put(connectRecords);
- // sleep 5s
- Thread.sleep(5000);
-
- // verify request
- HttpRequest[] recordedRequests =
mockServer.retrieveRecordedRequests(null);
- // assert recordedRequests.length == times;
+ // wait for receiving request
+ final int times = 5000; // 5 seconds
+ long start = System.currentTimeMillis();
+ while (counter.get() < size) {
+ if (System.currentTimeMillis() - start > times) {
+ // timeout
+ Assertions.fail("The number of requests received=" +
counter.get() + " is less than the number of ConnectRecord=" + size);
+ } else {
+ Thread.sleep(100);
+ }
+ }
// verify response
HttpWebhookConfig webhookConfig =
sinkConfig.connectorConfig.getWebhookConfig();
- String url = new HttpUrl.Builder()
- .scheme("http")
- .host(severUri.getHost())
- .port(webhookConfig.getPort())
- .addPathSegments(webhookConfig.getExportPath())
- .addQueryParameter("pageNum", "1")
- .addQueryParameter("pageSize", "10")
- .addQueryParameter("type", "poll")
- .build().toString();
-
- // build request
- Request request = new Request.Builder()
- .url(url)
- .addHeader("Content-Type", "application/json")
+
+ URI exportUrl = new URIBuilder()
+ .setScheme("http")
+ .setHost(url.getHost())
+ .setPort(webhookConfig.getPort())
+ .setPath(webhookConfig.getExportPath())
+ .addParameter("pageNum", "1")
+ .addParameter("pageSize", "10")
+ .addParameter("type", "poll")
.build();
- OkHttpClient client = new OkHttpClient();
- try (Response response = client.newCall(request).execute()) {
- // check response code
- if (!response.isSuccessful()) {
- throw new RuntimeException("Unexpected response code: " +
response);
- }
- // check response body
- ResponseBody responseBody = response.body();
- if (responseBody != null) {
- JSONObject jsonObject =
JSON.parseObject(responseBody.string());
+ Request.get(exportUrl)
+ .execute()
+ .handleResponse(response -> {
+ // check response code
+ Assertions.assertEquals(HttpStatus.SC_OK, response.getCode());
+ // check response body
+ JSONObject jsonObject =
JSON.parseObject(response.getEntity().getContent());
JSONArray pageItems = jsonObject.getJSONArray("pageItems");
- assert pageItems != null && pageItems.size() == times;
-
- for (int i = 0; i < times; i++) {
+ Assertions.assertNotNull(pageItems);
+ Assertions.assertEquals(size, pageItems.size());
+ for (int i = 0; i < size; i++) {
JSONObject pageItem = pageItems.getJSONObject(i);
- assert pageItem != null;
- // assert pageItem.getJSONObject("data") != null;
- // assert pageItem.getJSONObject("metadata") != null;
+ Assertions.assertNotNull(pageItem);
}
- }
- }
+ return null;
+ });
}
private ConnectRecord createConnectRecord() {
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
index b4cad1426..8e3735dd2 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
@@ -24,53 +24,83 @@ import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.util.ConfigUtil;
+import org.apache.hc.client5.http.fluent.Request;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.io.entity.StringEntity;
+
+import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
-import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okhttp3.Response;
-
class HttpSourceConnectorTest {
- private HttpSourceConnector connector;
- private SourceConnectorConfig config;
- private OkHttpClient httpClient;
- private String url;
- private final String expectedMessage = "testHttpMessage";
+ private static HttpSourceConnector connector;
+ private static String url;
+ private static final String expectedMessage = "testHttpMessage";
+ private static final int batchSize = 10;
- @BeforeEach
- void setUp() throws Exception {
+
+ @BeforeAll
+ static void setUpAll() throws Exception {
connector = new HttpSourceConnector();
- HttpSourceConfig sourceConfig = (HttpSourceConfig)
ConfigUtil.parse(connector.configClass());
- config = sourceConfig.getConnectorConfig();
+ final HttpSourceConfig sourceConfig = (HttpSourceConfig)
ConfigUtil.parse(connector.configClass());
+ final SourceConnectorConfig config = sourceConfig.getConnectorConfig();
+ // initialize and start the connector
connector.init(sourceConfig);
connector.start();
- // Add delay to ensure the server is fully started before the tests
begin
- Thread.sleep(2000);
+ // wait for the connector to start
+ long timeout = 5000; // 5 seconds
+ long start = System.currentTimeMillis();
+ while (!connector.isStarted()) {
+ if (System.currentTimeMillis() - start > timeout) {
+ // timeout
+ Assertions.fail("Failed to start the connector");
+ } else {
+ Thread.sleep(100);
+ }
+ }
url = new URL("http", "127.0.0.1", config.getPort(),
config.getPath()).toString();
- httpClient = new OkHttpClient();
}
+ @AfterAll
+ static void tearDownAll() throws IOException {
+ connector.stop();
+ }
+
+
@Test
- void testPoll() throws Exception {
- final int batchSize = 10;
- // test binary content mode
+ void testPollForBinaryRequest() {
for (int i = 0; i < batchSize; i++) {
- try (Response resp = mockBinaryRequest()) {
- Assertions.assertEquals(200, resp.code());
+ try {
+ // Set the request body
+ StringEntity entity = new StringEntity(expectedMessage,
ContentType.TEXT_PLAIN);
+
+ Request.post(url)
+ .addHeader("Content-Type", "text/plain")
+ .addHeader("ce-id", String.valueOf(UUID.randomUUID()))
+ .addHeader("ce-specversion", "1.0")
+ .addHeader("ce-type", "com.example.someevent")
+ .addHeader("ce-source", "/mycontext")
+ .addHeader("ce-subject", "test")
+ .body(entity)
+ .execute()
+ .handleResponse(res -> {
+ Assertions.assertEquals(HttpStatus.SC_OK,
res.getCode());
+ return null;
+ });
+ } catch (IOException e) {
+ Assertions.fail("Failed to send request", e);
}
}
List<ConnectRecord> res = connector.poll();
@@ -78,78 +108,61 @@ class HttpSourceConnectorTest {
for (ConnectRecord r : res) {
Assertions.assertEquals(expectedMessage, new String((byte[])
r.getData()));
}
+ }
- // test structured content mode
+ @Test
+ void testPollForStructuredRequest() {
for (int i = 0; i < batchSize; i++) {
- try (Response resp = mockStructuredRequest()) {
- Assertions.assertEquals(200, resp.code());
+ try {
+ // Create a CloudEvent
+ TestEvent event = new TestEvent();
+ event.id = String.valueOf(UUID.randomUUID());
+ event.specversion = "1.0";
+ event.type = "com.example.someevent";
+ event.source = "/mycontext";
+ event.subject = "test";
+ event.datacontenttype = "text/plain";
+ event.data = expectedMessage;
+
+ // Set the request body
+ StringEntity entity = new
StringEntity(Objects.requireNonNull(JsonUtils.toJSONString(event)),
ContentType.APPLICATION_JSON);
+
+ // Send the request and return the response
+ Request.post(url)
+ .addHeader("Content-Type", "application/cloudevents+json")
+ .body(entity)
+ .execute()
+ .handleResponse(res -> {
+ Assertions.assertEquals(HttpStatus.SC_OK,
res.getCode());
+ return null;
+ });
+ } catch (IOException e) {
+ Assertions.fail("Failed to send request", e);
}
}
- res = connector.poll();
+ List<ConnectRecord> res = connector.poll();
Assertions.assertEquals(batchSize, res.size());
for (ConnectRecord r : res) {
Assertions.assertEquals(expectedMessage, new String((byte[])
r.getData()));
}
-
- // test invalid requests
- Request request = new Request.Builder()
- .url(url)
- .addHeader("Content-Type", "text/plain")
- .addHeader("ce-id", String.valueOf(UUID.randomUUID()))
- .build();
-
- try (Response resp = httpClient.newCall(request).execute()) {
- // verify the response code
- Assertions.assertEquals(405, resp.code());
- }
-
}
- Response mockBinaryRequest() throws Exception {
-
- RequestBody body = RequestBody.create(expectedMessage,
MediaType.parse("text/plain"));
-
- Request request = new Request.Builder()
- .url(url)
- .addHeader("Content-Type", "text/plain")
- .addHeader("ce-id", String.valueOf(UUID.randomUUID()))
- .addHeader("ce-specversion", "1.0")
- .addHeader("ce-type", "com.example.someevent")
- .addHeader("ce-source", "/mycontext")
- .addHeader("ce-subject", "test")
- .post(body)
- .build();
-
- return httpClient.newCall(request).execute();
- }
-
- Response mockStructuredRequest() throws Exception {
- // create a CloudEvent
- TestEvent event = new TestEvent();
- event.id = String.valueOf(UUID.randomUUID());
- event.specversion = "1.0";
- event.type = "com.example.someevent";
- event.source = "/mycontext";
- event.subject = "test";
- event.datacontenttype = "text/plain";
- event.data = expectedMessage;
-
- RequestBody body =
RequestBody.create(Objects.requireNonNull(JsonUtils.toJSONString(event)),
MediaType.parse("application/cloudevents+json"));
- Request request = new Request.Builder()
- .url(url)
- .addHeader("Content-Type", "application/cloudevents+json")
- .post(body)
- .build();
-
- return httpClient.newCall(request).execute();
-
- }
-
- @AfterEach
- void tearDown() {
- connector.stop();
- httpClient.dispatcher().executorService().shutdown();
+ @Test
+ void testPollForInvalidRequest() {
+ // Send a bad request.
+ try {
+ Request.post(url)
+ .addHeader("Content-Type", "text/plain")
+ .execute()
+ .handleResponse(res -> {
+ // Check the response code
+ Assertions.assertEquals(HttpStatus.SC_BAD_REQUEST,
res.getCode());
+ return null;
+ });
+ } catch (IOException e) {
+ Assertions.fail("Failed to send request", e);
+ }
}
class TestEvent {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]