This is an automated email from the ASF dual-hosted git repository.

mxsm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 3436e5629 [ISSUE #5020] Optimize unit tests and code (#5023)
3436e5629 is described below

commit 3436e5629a1646b5155d4b8d782edbf52e288c5b
Author: Zaki <[email protected]>
AuthorDate: Tue Jul 9 13:48:05 2024 +0800

    [ISSUE #5020] Optimize unit tests and code (#5023)
---
 .../eventmesh-connector-http/build.gradle          |   4 +-
 .../http/sink/handle/CommonHttpSinkHandler.java    |  23 ++-
 .../http/sink/handle/WebhookHttpSinkHandler.java   |  38 ++++-
 .../http/source/connector/HttpSourceConnector.java |  41 ++++-
 .../source/connector/HttpSinkConnectorTest.java    | 110 ++++++------
 .../source/connector/HttpSourceConnectorTest.java  | 185 +++++++++++----------
 6 files changed, 238 insertions(+), 163 deletions(-)

diff --git a/eventmesh-connectors/eventmesh-connector-http/build.gradle 
b/eventmesh-connectors/eventmesh-connector-http/build.gradle
index b64f7903b..cfc69259d 100644
--- a/eventmesh-connectors/eventmesh-connector-http/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-http/build.gradle
@@ -24,8 +24,10 @@ dependencies {
     implementation 'io.vertx:vertx-web-client:4.5.8'
     implementation 'dev.failsafe:failsafe:3.3.2'
 
+
+    testImplementation 'org.apache.httpcomponents.client5:httpclient5:5.3.1'
+    testImplementation 
'org.apache.httpcomponents.client5:httpclient5-fluent:5.3.1'
     testImplementation 'org.mock-server:mockserver-netty:5.15.0'
-    testImplementation 'com.squareup.okhttp3:okhttp:4.12.0'
     compileOnly 'org.projectlombok:lombok'
     annotationProcessor 'org.projectlombok:lombok'
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
index 7eeba88d6..c6cc90e0e 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
@@ -133,7 +133,14 @@ public class CommonHttpSinkHandler implements 
HttpSinkHandler {
 
         // get timestamp and offset
         Long timestamp = httpConnectRecord.getData().getTimestamp();
-        Map<String, ?> offset = ((HttpRecordOffset) 
httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
+        Map<String, ?> offset = null;
+        try {
+            // May throw NullPointerException.
+            offset = ((HttpRecordOffset) 
httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
+        } catch (NullPointerException e) {
+            // ignore null pointer exception
+        }
+        final Map<String, ?> finalOffset = offset;
 
         // send the request
         return this.webClient.post(url.getPath())
@@ -143,26 +150,28 @@ public class CommonHttpSinkHandler implements 
HttpSinkHandler {
             .ssl(Objects.equals(url.getScheme(), "https"))
             .sendJson(httpConnectRecord)
             .onSuccess(res -> {
-                log.info("Request sent successfully. Record: timestamp={}, 
offset={}", timestamp, offset);
+                log.info("Request sent successfully. Record: timestamp={}, 
offset={}", timestamp, finalOffset);
                 // log the response
                 if (HttpUtils.is2xxSuccessful(res.statusCode())) {
                     if (log.isDebugEnabled()) {
                         log.debug("Received successful response: 
statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
-                            res.statusCode(), timestamp, offset, 
res.bodyAsString());
+                            res.statusCode(), timestamp, finalOffset, 
res.bodyAsString());
                     } else {
-                        log.info("Received successful response: statusCode={}. 
Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset);
+                        log.info("Received successful response: statusCode={}. 
Record: timestamp={}, offset={}", res.statusCode(), timestamp,
+                            finalOffset);
                     }
                 } else {
                     if (log.isDebugEnabled()) {
                         log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}, offset={}, responseBody={}",
-                            res.statusCode(), timestamp, offset, 
res.bodyAsString());
+                            res.statusCode(), timestamp, finalOffset, 
res.bodyAsString());
                     } else {
-                        log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset);
+                        log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}, offset={}", res.statusCode(), timestamp,
+                            finalOffset);
                     }
                 }
 
             })
-            .onFailure(err -> log.error("Request failed to send. Record: 
timestamp={}, offset={}", timestamp, offset, err));
+            .onFailure(err -> log.error("Request failed to send. Record: 
timestamp={}, offset={}", timestamp, finalOffset, err));
     }
 
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
index 9af246bc6..4e64126a9 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
@@ -72,6 +72,18 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
     // store the received data, when webhook is enabled
     private final SynchronizedCircularFifoQueue<HttpExportRecord> 
receivedDataQueue;
 
+    private volatile boolean exportStarted = false;
+
+    private volatile boolean exportDestroyed = false;
+
+    public boolean isExportStarted() {
+        return exportStarted;
+    }
+
+    public boolean isExportDestroyed() {
+        return exportDestroyed;
+    }
+
     public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
         super(sinkConnectorConfig);
         this.sinkConnectorConfig = sinkConnectorConfig;
@@ -179,10 +191,15 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
         // start the webclient
         super.start();
         // start the export server
-        Throwable t = this.exportServer.listen().cause();
-        if (t != null) {
-            throw new EventMeshException("Failed to start Vertx server. ", t);
-        }
+        this.exportServer.listen(res -> {
+            if (res.succeeded()) {
+                this.exportStarted = true;
+                log.info("WebhookHttpExportServer started on port: {}", 
this.webhookConfig.getPort());
+            } else {
+                log.error("WebhookHttpExportServer failed to start on port: 
{}", this.webhookConfig.getPort());
+                throw new EventMeshException("Failed to start Vertx server. ", 
res.cause());
+            }
+        });
     }
 
     /**
@@ -250,10 +267,15 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
         super.stop();
         // stop the export server
         if (this.exportServer != null) {
-            Throwable t = this.exportServer.close().cause();
-            if (t != null) {
-                throw new EventMeshException("Failed to stop Vertx server. ", 
t);
-            }
+            this.exportServer.close(res -> {
+                if (res.succeeded()) {
+                    this.exportDestroyed = true;
+                    log.info("WebhookHttpExportServer stopped on port: {}", 
this.webhookConfig.getPort());
+                } else {
+                    log.error("WebhookHttpExportServer failed to stop on port: 
{}", this.webhookConfig.getPort());
+                    throw new EventMeshException("Failed to stop Vertx server. 
", res.cause());
+                }
+            });
         } else {
             log.warn("Callback server is null, ignore.");
         }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
index b976fed9b..c59915b20 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnector.java
@@ -55,6 +55,18 @@ public class HttpSourceConnector implements Source {
 
     private HttpServer server;
 
+    private volatile boolean started = false;
+
+    private volatile boolean destroyed = false;
+
+    public boolean isStarted() {
+        return started;
+    }
+
+    public boolean isDestroyed() {
+        return destroyed;
+    }
+
 
     @Override
     public Class<? extends Config> configClass() {
@@ -105,10 +117,15 @@ public class HttpSourceConnector implements Source {
 
     @Override
     public void start() {
-        Throwable t = this.server.listen().cause();
-        if (t != null) {
-            throw new EventMeshException("failed to start Vertx server", t);
-        }
+        this.server.listen(res -> {
+            if (res.succeeded()) {
+                this.started = true;
+                log.info("HttpSourceConnector started on port: {}", 
this.sourceConfig.getConnectorConfig().getPort());
+            } else {
+                log.error("HttpSourceConnector failed to start on port: {}", 
this.sourceConfig.getConnectorConfig().getPort());
+                throw new EventMeshException("failed to start Vertx server", 
res.cause());
+            }
+        });
     }
 
     @Override
@@ -123,9 +140,19 @@ public class HttpSourceConnector implements Source {
 
     @Override
     public void stop() {
-        Throwable t = this.server.close().cause();
-        if (t != null) {
-            throw new EventMeshException("failed to stop Vertx server", t);
+        if (this.server != null) {
+            this.server.close(res -> {
+                    if (res.succeeded()) {
+                        this.destroyed = true;
+                        log.info("HttpSourceConnector stopped on port: {}", 
this.sourceConfig.getConnectorConfig().getPort());
+                    } else {
+                        log.error("HttpSourceConnector failed to stop on port: 
{}", this.sourceConfig.getConnectorConfig().getPort());
+                        throw new EventMeshException("failed to stop Vertx 
server", res.cause());
+                    }
+                }
+            );
+        } else {
+            log.warn("HttpSourceConnector server is null, ignore.");
         }
     }
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
index eeba625b0..778d963b5 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSinkConnectorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.connector.http.source.connector;
 
+
 import static org.mockserver.model.HttpRequest.request;
 
 import org.apache.eventmesh.connector.http.sink.HttpSinkConnector;
@@ -25,28 +26,30 @@ import 
org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 import org.apache.eventmesh.openconnect.util.ConfigUtil;
 
+import org.apache.hc.client5.http.fluent.Request;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.net.URIBuilder;
+
 import java.net.URI;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockserver.integration.ClientAndServer;
-import org.mockserver.model.HttpRequest;
 import org.mockserver.model.HttpResponse;
 import org.mockserver.model.MediaType;
 
+
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 
-import okhttp3.HttpUrl;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.Response;
-import okhttp3.ResponseBody;
 
 public class HttpSinkConnectorTest {
 
@@ -54,30 +57,33 @@ public class HttpSinkConnectorTest {
 
     private HttpSinkConfig sinkConfig;
 
-    private URI severUri;
+    private URL url;
 
     private ClientAndServer mockServer;
 
+    private static final AtomicInteger counter = new AtomicInteger(0);
 
     @BeforeEach
     void before() throws Exception {
         // init sinkConnector
-        this.sinkConnector = new HttpSinkConnector();
-        this.sinkConfig = (HttpSinkConfig) 
ConfigUtil.parse(sinkConnector.configClass());
-        this.sinkConnector.init(this.sinkConfig);
-        this.sinkConnector.start();
+        sinkConnector = new HttpSinkConnector();
+        sinkConfig = (HttpSinkConfig) 
ConfigUtil.parse(sinkConnector.configClass());
+        sinkConnector.init(this.sinkConfig);
+        sinkConnector.start();
 
-        this.severUri = URI.create(sinkConfig.connectorConfig.getUrls()[0]);
+        url = new URL(sinkConfig.connectorConfig.getUrls()[0]);
         // start mockServer
-        mockServer = ClientAndServer.startClientAndServer(severUri.getPort());
+        mockServer = ClientAndServer.startClientAndServer(url.getPort());
         mockServer.reset()
             .when(
                 request()
                     .withMethod("POST")
-                    .withPath(severUri.getPath())
+                    .withPath(url.getPath())
             )
             .respond(
                 httpRequest -> {
+                    // Increase the number of requests received
+                    counter.incrementAndGet();
                     JSONObject requestBody = 
JSON.parseObject(httpRequest.getBodyAsString());
                     return HttpResponse.response()
                         .withContentType(MediaType.APPLICATION_JSON)
@@ -90,6 +96,7 @@ public class HttpSinkConnectorTest {
                         ); // .withDelay(TimeUnit.SECONDS, 10);
                 }
             );
+
     }
 
     @AfterEach
@@ -101,62 +108,57 @@ public class HttpSinkConnectorTest {
     @Test
     void testPut() throws Exception {
         // Create a list of ConnectRecord
-        final int times = 10;
+        final int size = 10;
         List<ConnectRecord> connectRecords = new ArrayList<>();
-        for (int i = 0; i < times; i++) {
+        for (int i = 0; i < size; i++) {
             ConnectRecord record = createConnectRecord();
             connectRecords.add(record);
         }
         // Put ConnectRecord
         sinkConnector.put(connectRecords);
 
-        // sleep 5s
-        Thread.sleep(5000);
-
-        // verify request
-        HttpRequest[] recordedRequests = 
mockServer.retrieveRecordedRequests(null);
-        // assert recordedRequests.length == times;
+        // wait for receiving request
+        final int times = 5000; // 5 seconds
+        long start = System.currentTimeMillis();
+        while (counter.get() < size) {
+            if (System.currentTimeMillis() - start > times) {
+                // timeout
+                Assertions.fail("The number of requests received=" + 
counter.get() + " is less than the number of ConnectRecord=" + size);
+            } else {
+                Thread.sleep(100);
+            }
+        }
 
         // verify response
         HttpWebhookConfig webhookConfig = 
sinkConfig.connectorConfig.getWebhookConfig();
-        String url = new HttpUrl.Builder()
-            .scheme("http")
-            .host(severUri.getHost())
-            .port(webhookConfig.getPort())
-            .addPathSegments(webhookConfig.getExportPath())
-            .addQueryParameter("pageNum", "1")
-            .addQueryParameter("pageSize", "10")
-            .addQueryParameter("type", "poll")
-            .build().toString();
-
-        // build request
-        Request request = new Request.Builder()
-            .url(url)
-            .addHeader("Content-Type", "application/json")
+
+        URI exportUrl = new URIBuilder()
+            .setScheme("http")
+            .setHost(url.getHost())
+            .setPort(webhookConfig.getPort())
+            .setPath(webhookConfig.getExportPath())
+            .addParameter("pageNum", "1")
+            .addParameter("pageSize", "10")
+            .addParameter("type", "poll")
             .build();
 
-        OkHttpClient client = new OkHttpClient();
-        try (Response response = client.newCall(request).execute()) {
-            // check response code
-            if (!response.isSuccessful()) {
-                throw new RuntimeException("Unexpected response code: " + 
response);
-            }
-            // check response body
-            ResponseBody responseBody = response.body();
-            if (responseBody != null) {
-                JSONObject jsonObject = 
JSON.parseObject(responseBody.string());
+        Request.get(exportUrl)
+            .execute()
+            .handleResponse(response -> {
+                // check response code
+                Assertions.assertEquals(HttpStatus.SC_OK, response.getCode());
+                // check response body
+                JSONObject jsonObject = 
JSON.parseObject(response.getEntity().getContent());
                 JSONArray pageItems = jsonObject.getJSONArray("pageItems");
 
-                assert pageItems != null && pageItems.size() == times;
-
-                for (int i = 0; i < times; i++) {
+                Assertions.assertNotNull(pageItems);
+                Assertions.assertEquals(size, pageItems.size());
+                for (int i = 0; i < size; i++) {
                     JSONObject pageItem = pageItems.getJSONObject(i);
-                    assert pageItem != null;
-                    // assert pageItem.getJSONObject("data") != null;
-                    // assert pageItem.getJSONObject("metadata") != null;
+                    Assertions.assertNotNull(pageItem);
                 }
-            }
-        }
+                return null;
+            });
     }
 
     private ConnectRecord createConnectRecord() {
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
index b4cad1426..8e3735dd2 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/source/connector/HttpSourceConnectorTest.java
@@ -24,53 +24,83 @@ import org.apache.eventmesh.common.utils.JsonUtils;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 import org.apache.eventmesh.openconnect.util.ConfigUtil;
 
+import org.apache.hc.client5.http.fluent.Request;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.io.entity.StringEntity;
+
+import java.io.IOException;
 import java.net.URL;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
 
-import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okhttp3.Response;
-
 class HttpSourceConnectorTest {
 
-    private HttpSourceConnector connector;
-    private SourceConnectorConfig config;
-    private OkHttpClient httpClient;
-    private String url;
-    private final String expectedMessage = "testHttpMessage";
+    private static HttpSourceConnector connector;
+    private static String url;
+    private static final String expectedMessage = "testHttpMessage";
+    private static final int batchSize = 10;
 
-    @BeforeEach
-    void setUp() throws Exception {
+
+    @BeforeAll
+    static void setUpAll() throws Exception {
         connector = new HttpSourceConnector();
-        HttpSourceConfig sourceConfig = (HttpSourceConfig) 
ConfigUtil.parse(connector.configClass());
-        config = sourceConfig.getConnectorConfig();
+        final HttpSourceConfig sourceConfig = (HttpSourceConfig) 
ConfigUtil.parse(connector.configClass());
+        final SourceConnectorConfig config = sourceConfig.getConnectorConfig();
+        // initialize and start the connector
         connector.init(sourceConfig);
         connector.start();
 
-        // Add delay to ensure the server is fully started before the tests 
begin
-        Thread.sleep(2000);
+        // wait for the connector to start
+        long timeout = 5000; // 5 seconds
+        long start = System.currentTimeMillis();
+        while (!connector.isStarted()) {
+            if (System.currentTimeMillis() - start > timeout) {
+                // timeout
+                Assertions.fail("Failed to start the connector");
+            } else {
+                Thread.sleep(100);
+            }
+        }
 
         url = new URL("http", "127.0.0.1", config.getPort(), 
config.getPath()).toString();
-        httpClient = new OkHttpClient();
     }
 
+    @AfterAll
+    static void tearDownAll() throws IOException {
+        connector.stop();
+    }
+
+
     @Test
-    void testPoll() throws Exception {
-        final int batchSize = 10;
-        // test binary content mode
+    void testPollForBinaryRequest() {
         for (int i = 0; i < batchSize; i++) {
-            try (Response resp = mockBinaryRequest()) {
-                Assertions.assertEquals(200, resp.code());
+            try {
+                // Set the request body
+                StringEntity entity = new StringEntity(expectedMessage, 
ContentType.TEXT_PLAIN);
+
+                Request.post(url)
+                    .addHeader("Content-Type", "text/plain")
+                    .addHeader("ce-id", String.valueOf(UUID.randomUUID()))
+                    .addHeader("ce-specversion", "1.0")
+                    .addHeader("ce-type", "com.example.someevent")
+                    .addHeader("ce-source", "/mycontext")
+                    .addHeader("ce-subject", "test")
+                    .body(entity)
+                    .execute()
+                    .handleResponse(res -> {
+                        Assertions.assertEquals(HttpStatus.SC_OK, 
res.getCode());
+                        return null;
+                    });
+            } catch (IOException e) {
+                Assertions.fail("Failed to send request", e);
             }
         }
         List<ConnectRecord> res = connector.poll();
@@ -78,78 +108,61 @@ class HttpSourceConnectorTest {
         for (ConnectRecord r : res) {
             Assertions.assertEquals(expectedMessage, new String((byte[]) 
r.getData()));
         }
+    }
 
-        // test structured content mode
+    @Test
+    void testPollForStructuredRequest() {
         for (int i = 0; i < batchSize; i++) {
-            try (Response resp = mockStructuredRequest()) {
-                Assertions.assertEquals(200, resp.code());
+            try {
+                // Create a CloudEvent
+                TestEvent event = new TestEvent();
+                event.id = String.valueOf(UUID.randomUUID());
+                event.specversion = "1.0";
+                event.type = "com.example.someevent";
+                event.source = "/mycontext";
+                event.subject = "test";
+                event.datacontenttype = "text/plain";
+                event.data = expectedMessage;
+
+                // Set the request body
+                StringEntity entity = new 
StringEntity(Objects.requireNonNull(JsonUtils.toJSONString(event)), 
ContentType.APPLICATION_JSON);
+
+                // Send the request and return the response
+                Request.post(url)
+                    .addHeader("Content-Type", "application/cloudevents+json")
+                    .body(entity)
+                    .execute()
+                    .handleResponse(res -> {
+                        Assertions.assertEquals(HttpStatus.SC_OK, 
res.getCode());
+                        return null;
+                    });
+            } catch (IOException e) {
+                Assertions.fail("Failed to send request", e);
             }
         }
-        res = connector.poll();
+        List<ConnectRecord> res = connector.poll();
         Assertions.assertEquals(batchSize, res.size());
         for (ConnectRecord r : res) {
             Assertions.assertEquals(expectedMessage, new String((byte[]) 
r.getData()));
         }
-
-        // test invalid requests
-        Request request = new Request.Builder()
-            .url(url)
-            .addHeader("Content-Type", "text/plain")
-            .addHeader("ce-id", String.valueOf(UUID.randomUUID()))
-            .build();
-
-        try (Response resp = httpClient.newCall(request).execute()) {
-            // verify the response code
-            Assertions.assertEquals(405, resp.code());
-        }
-
     }
 
-    Response mockBinaryRequest() throws Exception {
-
-        RequestBody body = RequestBody.create(expectedMessage, 
MediaType.parse("text/plain"));
-
-        Request request = new Request.Builder()
-            .url(url)
-            .addHeader("Content-Type", "text/plain")
-            .addHeader("ce-id", String.valueOf(UUID.randomUUID()))
-            .addHeader("ce-specversion", "1.0")
-            .addHeader("ce-type", "com.example.someevent")
-            .addHeader("ce-source", "/mycontext")
-            .addHeader("ce-subject", "test")
-            .post(body)
-            .build();
-
-        return httpClient.newCall(request).execute();
-    }
-
-    Response mockStructuredRequest() throws Exception {
-        // create a CloudEvent
-        TestEvent event = new TestEvent();
-        event.id = String.valueOf(UUID.randomUUID());
-        event.specversion = "1.0";
-        event.type = "com.example.someevent";
-        event.source = "/mycontext";
-        event.subject = "test";
-        event.datacontenttype = "text/plain";
-        event.data = expectedMessage;
-
-        RequestBody body = 
RequestBody.create(Objects.requireNonNull(JsonUtils.toJSONString(event)), 
MediaType.parse("application/cloudevents+json"));
 
-        Request request = new Request.Builder()
-            .url(url)
-            .addHeader("Content-Type", "application/cloudevents+json")
-            .post(body)
-            .build();
-
-        return httpClient.newCall(request).execute();
-
-    }
-
-    @AfterEach
-    void tearDown() {
-        connector.stop();
-        httpClient.dispatcher().executorService().shutdown();
+    @Test
+    void testPollForInvalidRequest() {
+        // Send a bad request.
+        try {
+            Request.post(url)
+                .addHeader("Content-Type", "text/plain")
+                .execute()
+                .handleResponse(res -> {
+                    // Check the response code
+                    Assertions.assertEquals(HttpStatus.SC_BAD_REQUEST, 
res.getCode());
+                    return null;
+                });
+        } catch (IOException e) {
+            Assertions.fail("Failed to send request", e);
+        }
     }
 
     class TestEvent {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to