cnzakii commented on code in PR #4837:
URL: https://github.com/apache/eventmesh/pull/4837#discussion_r1569809935


##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handle;
+
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Common HttpSinkHandler to handle ConnectRecord
+ */
+@Slf4j
+public class CommonHttpSinkHandler implements HttpSinkHandler {
+
+    private final SinkConnectorConfig connectorConfig;
+
+    private WebClient webClient;
+
+    private final String type;
+
+    // store the received data, when webhook is enabled
+    private final BlockingQueue<JSONObject> receivedDataQueue;
+
+    public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+        SinkConnectorConfig.fillDefault(sinkConnectorConfig);
+        this.connectorConfig = sinkConnectorConfig;
+        this.receivedDataQueue = this.connectorConfig.isWebhook() ? new 
LinkedBlockingQueue<>() : null;
+        type = String.format("%s.%s.%s",
+            sinkConnectorConfig.getConnectorName(),
+            sinkConnectorConfig.isSsl() ? "https" : "http",
+            sinkConnectorConfig.isWebhook() ? "webhook" : "common");
+    }
+
+    /**
+     * Get the oldest data in the queue
+     *
+     * @return received data
+     */
+    public Object getReceivedData() {
+        if (!this.connectorConfig.isWebhook()) {
+            return null;
+        }
+        return this.receivedDataQueue.poll();
+    }
+
+    /**
+     * Get all received data
+     *
+     * @return all received data
+     */
+    public Object[] getAllReceivedData() {
+        if (!connectorConfig.isWebhook() || receivedDataQueue.isEmpty()) {
+            return new Object[0];
+        }
+        Object[] arr = receivedDataQueue.toArray();
+        receivedDataQueue.clear();
+        return arr;
+    }
+
+
+    @Override
+    public void start() {
+        // Create WebClient
+        doInitWebClient();
+    }
+
+    private void doInitWebClient() {
+        final Vertx vertx = Vertx.vertx();
+        // TODO add more configurations
+        WebClientOptions options = new WebClientOptions()
+            .setDefaultHost(this.connectorConfig.getHost())
+            .setDefaultPort(this.connectorConfig.getPort())
+            .setSsl(this.connectorConfig.isSsl())
+            .setIdleTimeout(this.connectorConfig.getIdleTimeout())
+            .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
+            .setConnectTimeout(this.connectorConfig.getConnectionTimeout());
+
+        this.webClient = WebClient.create(vertx, options);
+    }
+
+
+    @Override
+    public void handle(ConnectRecord record) {
+        // create headers
+        MultiMap headers = HttpHeaders.headers();
+        headers.add(HttpHeaderNames.CONTENT_TYPE, "application/json; 
charset=utf-8");
+        headers.add(HttpHeaderNames.USER_AGENT, "EventMesh-Sink-Connector");

Review Comment:
   I'm going to delete this line of code because it's a "code smell".
   The correct way to write it is to set whether it is enabled through 
`setUserAgentEnabled()` of `WebClientOptions` (default: `true`), or to set the 
name and enable it through `setUserAgent` (default name: 
`Vert.x-WebClient/4.4.6`)



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java:
##########
@@ -100,25 +100,25 @@ public void start() {
 
     private void doInitWebClient() {
         final Vertx vertx = Vertx.vertx();
-        // TODO add more configurations
         WebClientOptions options = new WebClientOptions()
             .setDefaultHost(this.connectorConfig.getHost())
             .setDefaultPort(this.connectorConfig.getPort())
             .setSsl(this.connectorConfig.isSsl())
+            .setKeepAlive(this.connectorConfig.isKeepAlive())
+            .setKeepAliveTimeout(this.connectorConfig.getKeepAliveTimeout() / 
1000)
             .setIdleTimeout(this.connectorConfig.getIdleTimeout())
             .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
-            .setConnectTimeout(this.connectorConfig.getConnectionTimeout());
-
+            .setConnectTimeout(this.connectorConfig.getConnectionTimeout())
+            .setMaxPoolSize(this.connectorConfig.getMaxConnectionPoolSize());
         this.webClient = WebClient.create(vertx, options);
     }
 
 
     @Override
     public void handle(ConnectRecord record) {
         // create headers
-        MultiMap headers = HttpHeaders.headers();
-        headers.add(HttpHeaderNames.CONTENT_TYPE, "application/json; 
charset=utf-8");
-        headers.add(HttpHeaderNames.USER_AGENT, "EventMesh-Sink-Connector");
+        MultiMap headers = HttpHeaders.headers()
+            .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");

Review Comment:
   This is a mistake. The correct value should be 
`HttpHeaderNames.CONTENT_TYPE`. I will modify it and submit it. 



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handle;
+
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Common HttpSinkHandler to handle ConnectRecord
+ */
+@Slf4j
+public class CommonHttpSinkHandler implements HttpSinkHandler {
+
+    private final SinkConnectorConfig connectorConfig;
+
+    private WebClient webClient;
+
+    private final String type;
+
+    // store the received data, when webhook is enabled
+    private final BlockingQueue<JSONObject> receivedDataQueue;
+
+    public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+        SinkConnectorConfig.fillDefault(sinkConnectorConfig);
+        this.connectorConfig = sinkConnectorConfig;
+        this.receivedDataQueue = this.connectorConfig.isWebhook() ? new 
LinkedBlockingQueue<>() : null;
+        type = String.format("%s.%s.%s",
+            sinkConnectorConfig.getConnectorName(),
+            sinkConnectorConfig.isSsl() ? "https" : "http",
+            sinkConnectorConfig.isWebhook() ? "webhook" : "common");
+    }
+
+    /**
+     * Get the oldest data in the queue
+     *
+     * @return received data
+     */
+    public Object getReceivedData() {
+        if (!this.connectorConfig.isWebhook()) {
+            return null;
+        }
+        return this.receivedDataQueue.poll();
+    }
+
+    /**
+     * Get all received data
+     *
+     * @return all received data
+     */
+    public Object[] getAllReceivedData() {
+        if (!connectorConfig.isWebhook() || receivedDataQueue.isEmpty()) {
+            return new Object[0];
+        }
+        Object[] arr = receivedDataQueue.toArray();
+        receivedDataQueue.clear();
+        return arr;
+    }
+
+
+    @Override
+    public void start() {
+        // Create WebClient
+        doInitWebClient();
+    }
+
+    private void doInitWebClient() {
+        final Vertx vertx = Vertx.vertx();
+        // TODO add more configurations
+        WebClientOptions options = new WebClientOptions()
+            .setDefaultHost(this.connectorConfig.getHost())
+            .setDefaultPort(this.connectorConfig.getPort())
+            .setSsl(this.connectorConfig.isSsl())
+            .setIdleTimeout(this.connectorConfig.getIdleTimeout())
+            .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
+            .setConnectTimeout(this.connectorConfig.getConnectionTimeout());
+
+        this.webClient = WebClient.create(vertx, options);
+    }
+
+
+    @Override
+    public void handle(ConnectRecord record) {
+        // create headers
+        MultiMap headers = HttpHeaders.headers();
+        headers.add(HttpHeaderNames.CONTENT_TYPE, "application/json; 
charset=utf-8");
+        headers.add(HttpHeaderNames.USER_AGENT, "EventMesh-Sink-Connector");

Review Comment:
   Resolved in 
https://github.com/apache/eventmesh/pull/4837/commits/5cb55a84f5ea43e10c58613bd8327ec62518846e



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handle;
+
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Common HttpSinkHandler to handle ConnectRecord
+ */
+@Slf4j
+public class CommonHttpSinkHandler implements HttpSinkHandler {
+
+    private final SinkConnectorConfig connectorConfig;
+
+    private WebClient webClient;
+
+    private final String type;
+
+    // store the received data, when webhook is enabled
+    private final BlockingQueue<JSONObject> receivedDataQueue;
+
+    public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+        SinkConnectorConfig.populateFieldsWithDefaults(sinkConnectorConfig);
+        this.connectorConfig = sinkConnectorConfig;
+        this.receivedDataQueue = this.connectorConfig.isWebhook() ? new 
LinkedBlockingQueue<>() : null;
+        type = String.format("%s.%s.%s",
+            sinkConnectorConfig.getConnectorName(),
+            sinkConnectorConfig.isSsl() ? "https" : "http",
+            sinkConnectorConfig.isWebhook() ? "webhook" : "common");
+    }
+
+    /**
+     * Get the oldest data in the queue
+     *
+     * @return received data
+     */
+    public Object getReceivedData() {
+        if (!this.connectorConfig.isWebhook()) {
+            return null;
+        }
+        return this.receivedDataQueue.poll();
+    }
+
+    /**
+     * Get all received data
+     *
+     * @return all received data
+     */
+    public Object[] getAllReceivedData() {
+        if (!connectorConfig.isWebhook() || receivedDataQueue.isEmpty()) {
+            return new Object[0];
+        }
+        Object[] arr = receivedDataQueue.toArray();
+        receivedDataQueue.clear();
+        return arr;
+    }
+
+
+    @Override
+    public void start() {
+        // Create WebClient
+        doInitWebClient();
+    }
+
+    private void doInitWebClient() {
+        final Vertx vertx = Vertx.vertx();
+        WebClientOptions options = new WebClientOptions()
+            .setDefaultHost(this.connectorConfig.getHost())
+            .setDefaultPort(this.connectorConfig.getPort())
+            .setSsl(this.connectorConfig.isSsl())
+            .setKeepAlive(this.connectorConfig.isKeepAlive())
+            .setKeepAliveTimeout(this.connectorConfig.getKeepAliveTimeout() / 
1000)
+            .setIdleTimeout(this.connectorConfig.getIdleTimeout())
+            .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
+            .setConnectTimeout(this.connectorConfig.getConnectionTimeout())
+            .setMaxPoolSize(this.connectorConfig.getMaxConnectionPoolSize());
+        this.webClient = WebClient.create(vertx, options);
+    }
+
+
+    @Override
+    public void handle(ConnectRecord record) {
+        // create headers
+        MultiMap headers = HttpHeaders.headers()
+            .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");
+
+        // convert ConnectRecord to HttpConnectRecord
+        HttpConnectRecord httpConnectRecord = 
convertToHttpConnectRecord(record);
+
+        // send the request
+        this.webClient.post(this.connectorConfig.getPath())
+            .putHeaders(headers)
+            .sendJson(httpConnectRecord)
+            .onSuccess(res -> {
+                Long timestamp = record.getTimestamp();
+                Map<String, ?> offset = 
record.getPosition().getOffset().getOffset();
+                log.info("Request sent successfully. Record: timestamp={}, 
offset={}", timestamp, offset);
+                // Determine whether the status code is 200
+                if (res.statusCode() == HttpResponseStatus.OK.code()) {
+                    // store the received data, when webhook is enabled
+                    if (this.connectorConfig.isWebhook()) {
+                        String dataStr = res.body().toString();
+                        if (dataStr.isEmpty()) {
+                            log.warn("Received data is empty.");
+                            return;
+                        }
+                        JSONObject receivedData = JSON.parseObject(dataStr);

Review Comment:
   >Serializing a response body and deserialize it again is a lot performance 
overhead. I think there's no need to output a warning when response body is 
empty.
   
   Ok, I will call only `res.bodyAsString()` or `res.bodyAsJsonObject()` method 
to serialize the response body.
   
   >By the way, the bodyAsString() method has a better performance than 
toString().
   
   I don't really agree with you because method `bodyAsString()` essentially 
also calls method `toString()`
   
   ![2024-04-21_02 12 
29](https://github.com/apache/eventmesh/assets/91261012/6abf91c9-f92c-4c6e-8515-756dd537fe3e)
   
   ![2024-04-21_02 13 
22](https://github.com/apache/eventmesh/assets/91261012/a72af20f-e42b-4e04-b3bc-2c6cd0b4d10d)
   
   
   
   



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.connector;
+
+import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.List;
+import java.util.Objects;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class HttpSinkConnector implements Sink {
+
+    private HttpSinkConfig httpSinkConfig;
+
+    private WebClient webClient;
+
+    private volatile boolean isRunning = false;
+
+    @Override
+    public Class<? extends Config> configClass() {
+        return HttpSinkConfig.class;
+    }
+
+    @Override
+    public void init(Config config) throws Exception {
+        httpSinkConfig = (HttpSinkConfig) config;
+        doInit();
+    }
+
+    @Override
+    public void init(ConnectorContext connectorContext) throws Exception {
+        SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) 
connectorContext;
+        this.httpSinkConfig = (HttpSinkConfig) 
sinkConnectorContext.getSinkConfig();
+        doInit();
+    }
+
+    @SneakyThrows
+    private void doInit() {
+        final Vertx vertx = Vertx.vertx();
+        // TODO Add more configurations
+        WebClientOptions options = new WebClientOptions()

Review Comment:
   Resolved in 
https://github.com/apache/eventmesh/pull/4837/commits/5cb55a84f5ea43e10c58613bd8327ec62518846e



##########
eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml:
##########
@@ -30,6 +30,6 @@ connectorConfig:
   port: 8987
   path: /test
   ssl: false
-  idleTimeout: 5
+  idleTimeout: 5000
   webhookConfig:

Review Comment:
   Resolved in 
https://github.com/apache/eventmesh/pull/4837/commits/408aaa6442dd232f51b3799a57dc1641f03ee72f



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java:
##########
@@ -120,14 +120,14 @@ private void sendMessage(ConnectRecord record) {
             .putHeader("Content-Type", "application/json; charset=utf-8")
             .sendJson(record, ar -> {
                 if (ar.succeeded()) {
+                    log.info("[HttpSinkConnector] Successfully send message 
via HTTP. Record: timestamp={}, offset={}", record.getTimestamp(),
+                        record.getPosition().getOffset());
                     if (ar.result().statusCode() != 
HttpResponseStatus.OK.code()) {
-                        log.error("[HttpSinkConnector] Failed to send message 
via HTTP. Response: {}", ar.result());
-                    } else {
-                        log.info("[HttpSinkConnector] Successfully send 
message via HTTP. ");
+                        log.error("[HttpSinkConnector] Unexpected response 
received. StatusCode: {}", ar.result().statusCode());

Review Comment:
   Resolved in 
https://github.com/apache/eventmesh/pull/4837/commits/408aaa6442dd232f51b3799a57dc1641f03ee72f



##########
eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml:
##########
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pubSubConfig:
+  meshAddress: 127.0.0.1:10000
+  subject: TopicTest
+  idc: FT
+  env: PRD
+  group: httpSink
+  appId: 5032
+  userName: httpSinkUser
+  passWord: httpPassWord
+connectorConfig:
+  connectorName: httpSink
+  host: 127.0.0.1
+  port: 8987
+  path: /test
+  ssl: false

Review Comment:
   This is a good idea, but do we support both common HTTP and webhooks? If so, 
I think it can be configured like this:
   ```
   webhook: true
   url:
     - https://webhook.foo.com/callback
     - http://webhook.bar.com:8080/callback
     - ...
   ```



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java:
##########
@@ -83,20 +83,28 @@ private void doInitCallbackServer() {
             .path(this.webhookConfig.getCallbackPath())
             .method(HttpMethod.POST)
             .produces("application/json")
-            .handler(ctx -> {
-                JSONObject callbackData = 
JSON.parseObject(ctx.body().asString());
-                // store callback data
-                if (!this.callbackQueue.offer(callbackData)) {
-                    log.error("Callback data is full, discard the data. Data: 
{}", callbackData);
+            .handler(ctx -> ctx.request().body().onComplete(ar -> {
+                if (ar.succeeded()) {
+                    JSONObject callbackData = 
JSON.parseObject(ar.result().toString());
+                    // store callback data
+                    if (!this.callbackQueue.offer(callbackData)) {
+                        log.error("Callback data is full, discard the data. 
Data: {}", callbackData);

Review Comment:
   Resolved in 
https://github.com/apache/eventmesh/pull/4837/commits/408aaa6442dd232f51b3799a57dc1641f03ee72f



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java:
##########
@@ -83,20 +83,28 @@ private void doInitCallbackServer() {
             .path(this.webhookConfig.getCallbackPath())
             .method(HttpMethod.POST)
             .produces("application/json")
-            .handler(ctx -> {
-                JSONObject callbackData = 
JSON.parseObject(ctx.body().asString());
-                // store callback data
-                if (!this.callbackQueue.offer(callbackData)) {
-                    log.error("Callback data is full, discard the data. Data: 
{}", callbackData);
+            .handler(ctx -> ctx.request().body().onComplete(ar -> {
+                if (ar.succeeded()) {
+                    JSONObject callbackData = 
JSON.parseObject(ar.result().toString());
+                    // store callback data
+                    if (!this.callbackQueue.offer(callbackData)) {
+                        log.error("Callback data is full, discard the data. 
Data: {}", callbackData);
+                    } else {
+                        log.debug("Succeed to store callback data. Data: {}", 
callbackData);
+                    }
+                    // response 200 OK
+                    ctx.response()
+                        .putHeader(HttpHeaders.CONTENT_TYPE, 
"application/json; charset=utf-8")

Review Comment:
   Resolved in 
https://github.com/apache/eventmesh/pull/4837/commits/408aaa6442dd232f51b3799a57dc1641f03ee72f



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handle;
+
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Common HttpSinkHandler to handle ConnectRecord
+ */
+@Slf4j
+public class CommonHttpSinkHandler implements HttpSinkHandler {
+

Review Comment:
   This is a very good suggestion, but I decided to separate the `webhook` from 
the`CommonHttpSinkHandler`.I want to make `CommonHttpSinkHandler` more "pure" 
and "generic", not more bloated!
   
   In `CommonHttpSinkHandler`, only the basic request sending functionality is 
retained, and it focuses only on sending the HTTP request and encapsulating the 
response into a `Future`, which is given to the other processor for processing.
   ```
   @Override
       public void handle(ConnectRecord record) {
           for (URI url : this.urls) {
                // .............
               send(url, httpConnectRecord);
           }
       }
   
       public Future<HttpResponse<Buffer>> send(URI url, HttpConnectRecord 
httpConnectRecord) {
            // .............
           return this.webClient.post(url.getPath())
               .host(url.getHost())
               .port(url.getPort())
               .putHeaders(headers)
               .ssl(Objects.equals(url.getScheme(), "https"))
               .sendJson(httpConnectRecord)
               .onSuccess(res -> {
                   // .............
               })
               .onFailure(err -> {
                   // .............
               });
       }
   ```
   In `WebhookHttpSinkHandler`, send the message by calling the `send()` method 
of `CommonHttpSinkHandler`, and then add its own handler (holding the response 
body) in the returned Future.
   ```
       public void handle(ConnectRecord record) {
           for (URI url : super.getUrls()) {
               // .............
               // send the request
               Future<HttpResponse<Buffer>> responseFuture = send(url, 
httpConnectRecord);
               // handle the response
               doHandle(responseFuture);
           }
       }
   
       private void doHandle(Future<HttpResponse<Buffer>> responseFuture) {
           responseFuture.onSuccess(res -> {
               // ................
           });
       }
   ```
   
   



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.connector;
+
+import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig;
+import org.apache.eventmesh.openconnect.api.config.Config;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.List;
+import java.util.Objects;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+
+@Slf4j
+public class HttpSinkConnector implements Sink {
+
+    private HttpSinkConfig httpSinkConfig;
+
+    private WebClient webClient;
+
+    private volatile boolean isRunning = false;
+
+    @Override
+    public Class<? extends Config> configClass() {
+        return HttpSinkConfig.class;
+    }
+
+    @Override
+    public void init(Config config) throws Exception {
+        httpSinkConfig = (HttpSinkConfig) config;
+        doInit();
+    }
+
+    @Override
+    public void init(ConnectorContext connectorContext) throws Exception {
+        SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) 
connectorContext;
+        this.httpSinkConfig = (HttpSinkConfig) 
sinkConnectorContext.getSinkConfig();
+        doInit();
+    }
+
+    @SneakyThrows
+    private void doInit() {
+        final Vertx vertx = Vertx.vertx();
+        // TODO Add more configurations
+        WebClientOptions options = new WebClientOptions()

Review Comment:
   I also need to use `vertx-junit5` to reconstruct the unit test and simulate 
a more realistic environment to verify my code.



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java:
##########
@@ -49,7 +49,7 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
     private final HttpWebhookConfig webhookConfig;
 
     // store the callback data
-    private final BlockingQueue<Object> callbackQueue;
+    private final BlockingQueue<JSONObject> callbackQueue;

Review Comment:
   Resolved in 
https://github.com/apache/eventmesh/pull/4837/commits/408aaa6442dd232f51b3799a57dc1641f03ee72f



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handle;
+
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Common HttpSinkHandler to handle ConnectRecord
+ */
+@Slf4j
+public class CommonHttpSinkHandler implements HttpSinkHandler {
+
+    private final SinkConnectorConfig connectorConfig;
+
+    private WebClient webClient;
+
+    private final String type;
+
+    // store the received data, when webhook is enabled
+    private final BlockingQueue<JSONObject> receivedDataQueue;
+
+    public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+        SinkConnectorConfig.populateFieldsWithDefaults(sinkConnectorConfig);
+        this.connectorConfig = sinkConnectorConfig;
+        this.receivedDataQueue = this.connectorConfig.isWebhook() ? new 
LinkedBlockingQueue<>() : null;
+        type = String.format("%s.%s.%s",
+            sinkConnectorConfig.getConnectorName(),
+            sinkConnectorConfig.isSsl() ? "https" : "http",
+            sinkConnectorConfig.isWebhook() ? "webhook" : "common");
+    }
+
+    /**
+     * Get the oldest data in the queue
+     *
+     * @return received data
+     */
+    public Object getReceivedData() {
+        if (!this.connectorConfig.isWebhook()) {
+            return null;
+        }
+        return this.receivedDataQueue.poll();
+    }

Review Comment:
   My original intent was to provide an outlet for exposing callback data, but 
I'm not quite sure if I want to expose it this way or by opening an HTTPServer.



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handle;
+
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Common HttpSinkHandler to handle ConnectRecord
+ */
+@Slf4j
+public class CommonHttpSinkHandler implements HttpSinkHandler {
+
+    private final SinkConnectorConfig connectorConfig;
+
+    private WebClient webClient;
+
+    private final String type;
+
+    // store the received data, when webhook is enabled
+    private final BlockingQueue<JSONObject> receivedDataQueue;
+
+    public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+        SinkConnectorConfig.populateFieldsWithDefaults(sinkConnectorConfig);
+        this.connectorConfig = sinkConnectorConfig;
+        this.receivedDataQueue = this.connectorConfig.isWebhook() ? new 
LinkedBlockingQueue<>() : null;
+        type = String.format("%s.%s.%s",
+            sinkConnectorConfig.getConnectorName(),
+            sinkConnectorConfig.isSsl() ? "https" : "http",
+            sinkConnectorConfig.isWebhook() ? "webhook" : "common");
+    }
+
+    /**
+     * Get the oldest data in the queue
+     *
+     * @return received data
+     */
+    public Object getReceivedData() {
+        if (!this.connectorConfig.isWebhook()) {
+            return null;
+        }
+        return this.receivedDataQueue.poll();
+    }
+
+    /**
+     * Get all received data
+     *
+     * @return all received data
+     */
+    public Object[] getAllReceivedData() {
+        if (!connectorConfig.isWebhook() || receivedDataQueue.isEmpty()) {
+            return new Object[0];
+        }
+        Object[] arr = receivedDataQueue.toArray();
+        receivedDataQueue.clear();
+        return arr;
+    }
+
+
+    @Override
+    public void start() {
+        // Create WebClient
+        doInitWebClient();
+    }
+
+    private void doInitWebClient() {
+        final Vertx vertx = Vertx.vertx();
+        WebClientOptions options = new WebClientOptions()
+            .setDefaultHost(this.connectorConfig.getHost())
+            .setDefaultPort(this.connectorConfig.getPort())
+            .setSsl(this.connectorConfig.isSsl())
+            .setKeepAlive(this.connectorConfig.isKeepAlive())
+            .setKeepAliveTimeout(this.connectorConfig.getKeepAliveTimeout() / 
1000)
+            .setIdleTimeout(this.connectorConfig.getIdleTimeout())
+            .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
+            .setConnectTimeout(this.connectorConfig.getConnectionTimeout())
+            .setMaxPoolSize(this.connectorConfig.getMaxConnectionPoolSize());
+        this.webClient = WebClient.create(vertx, options);
+    }
+
+
+    @Override
+    public void handle(ConnectRecord record) {
+        // create headers
+        MultiMap headers = HttpHeaders.headers()
+            .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");
+
+        // convert ConnectRecord to HttpConnectRecord
+        HttpConnectRecord httpConnectRecord = 
convertToHttpConnectRecord(record);
+
+        // send the request
+        this.webClient.post(this.connectorConfig.getPath())
+            .putHeaders(headers)
+            .sendJson(httpConnectRecord)
+            .onSuccess(res -> {
+                Long timestamp = record.getTimestamp();
+                Map<String, ?> offset = 
record.getPosition().getOffset().getOffset();
+                log.info("Request sent successfully. Record: timestamp={}, 
offset={}", timestamp, offset);
+                // Determine whether the status code is 200
+                if (res.statusCode() == HttpResponseStatus.OK.code()) {
+                    // store the received data, when webhook is enabled
+                    if (this.connectorConfig.isWebhook()) {
+                        String dataStr = res.body().toString();
+                        if (dataStr.isEmpty()) {
+                            log.warn("Received data is empty.");
+                            return;
+                        }
+                        JSONObject receivedData = JSON.parseObject(dataStr);
+                        if (receivedDataQueue.size() == Integer.MAX_VALUE) {
+                            // if the queue is full, remove the oldest element
+                            JSONObject removedData = receivedDataQueue.poll();
+                            log.info("The queue is full, remove the oldest 
element: {}", removedData);
+                        }
+                        boolean b = receivedDataQueue.offer(receivedData);
+                        if (b) {
+                            log.info("Successfully put the received data into 
the queue: {}", receivedData);
+                        } else {
+                            log.error("Failed to put the received data into 
the queue: {}", receivedData);
+                        }
+                    }
+                } else {
+                    log.error("Unexpected response received. Record: 
timestamp={}, offset={}. Response: code={} header={}, body={}",
+                        timestamp,
+                        offset,
+                        res.statusCode(),
+                        res.headers(),
+                        res.body().toString()
+                    );
+                }
+            })
+            .onFailure(err -> {
+                Long timestamp = record.getTimestamp();
+                Map<String, ?> offset = 
record.getPosition().getOffset().getOffset();
+                log.error("Request failed to send. Record: timestamp={}, 
offset={}", timestamp, offset, err);
+            });
+    }
+
+    /**
+     * Convert ConnectRecord to HttpConnectRecord
+     *
+     * @param record the ConnectRecord to convert
+     * @return the converted HttpConnectRecord
+     */
+    private HttpConnectRecord convertToHttpConnectRecord(ConnectRecord record) 
{
+        HttpConnectRecord httpConnectRecord = new HttpConnectRecord();
+        httpConnectRecord.setType(this.type);
+        LocalDateTime currentTime = LocalDateTime.now();
+        httpConnectRecord.setTimestamp(currentTime.toString());
+        httpConnectRecord.setData(record);
+        return httpConnectRecord;
+    }

Review Comment:
   OK, I will modify it.



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handle;
+
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.time.LocalDateTime;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.MultiMap;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Common HttpSinkHandler to handle ConnectRecord
+ */
+@Slf4j
+public class CommonHttpSinkHandler implements HttpSinkHandler {
+
+    private final SinkConnectorConfig connectorConfig;
+
+    private WebClient webClient;
+
+    private final String type;
+
+    // store the received data, when webhook is enabled
+    private final BlockingQueue<JSONObject> receivedDataQueue;
+
+    public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+        SinkConnectorConfig.populateFieldsWithDefaults(sinkConnectorConfig);
+        this.connectorConfig = sinkConnectorConfig;
+        this.receivedDataQueue = this.connectorConfig.isWebhook() ? new 
LinkedBlockingQueue<>() : null;
+        type = String.format("%s.%s.%s",
+            sinkConnectorConfig.getConnectorName(),
+            sinkConnectorConfig.isSsl() ? "https" : "http",
+            sinkConnectorConfig.isWebhook() ? "webhook" : "common");
+    }
+
+    /**
+     * Get the oldest data in the queue
+     *
+     * @return received data
+     */
+    public Object getReceivedData() {
+        if (!this.connectorConfig.isWebhook()) {
+            return null;
+        }
+        return this.receivedDataQueue.poll();
+    }
+
+    /**
+     * Get all received data
+     *
+     * @return all received data
+     */
+    public Object[] getAllReceivedData() {
+        if (!connectorConfig.isWebhook() || receivedDataQueue.isEmpty()) {
+            return new Object[0];
+        }
+        Object[] arr = receivedDataQueue.toArray();
+        receivedDataQueue.clear();
+        return arr;
+    }
+
+
+    @Override
+    public void start() {
+        // Create WebClient
+        doInitWebClient();
+    }
+
+    private void doInitWebClient() {
+        final Vertx vertx = Vertx.vertx();
+        WebClientOptions options = new WebClientOptions()
+            .setDefaultHost(this.connectorConfig.getHost())
+            .setDefaultPort(this.connectorConfig.getPort())
+            .setSsl(this.connectorConfig.isSsl())
+            .setKeepAlive(this.connectorConfig.isKeepAlive())
+            .setKeepAliveTimeout(this.connectorConfig.getKeepAliveTimeout() / 
1000)
+            .setIdleTimeout(this.connectorConfig.getIdleTimeout())
+            .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
+            .setConnectTimeout(this.connectorConfig.getConnectionTimeout())
+            .setMaxPoolSize(this.connectorConfig.getMaxConnectionPoolSize());
+        this.webClient = WebClient.create(vertx, options);
+    }
+
+
+    @Override
+    public void handle(ConnectRecord record) {
+        // create headers
+        MultiMap headers = HttpHeaders.headers()
+            .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");
+
+        // convert ConnectRecord to HttpConnectRecord
+        HttpConnectRecord httpConnectRecord = 
convertToHttpConnectRecord(record);
+
+        // send the request
+        this.webClient.post(this.connectorConfig.getPath())
+            .putHeaders(headers)
+            .sendJson(httpConnectRecord)
+            .onSuccess(res -> {
+                Long timestamp = record.getTimestamp();
+                Map<String, ?> offset = 
record.getPosition().getOffset().getOffset();
+                log.info("Request sent successfully. Record: timestamp={}, 
offset={}", timestamp, offset);
+                // Determine whether the status code is 200
+                if (res.statusCode() == HttpResponseStatus.OK.code()) {
+                    // store the received data, when webhook is enabled
+                    if (this.connectorConfig.isWebhook()) {
+                        String dataStr = res.body().toString();
+                        if (dataStr.isEmpty()) {
+                            log.warn("Received data is empty.");
+                            return;
+                        }
+                        JSONObject receivedData = JSON.parseObject(dataStr);
+                        if (receivedDataQueue.size() == Integer.MAX_VALUE) {
+                            // if the queue is full, remove the oldest element
+                            JSONObject removedData = receivedDataQueue.poll();
+                            log.info("The queue is full, remove the oldest 
element: {}", removedData);
+                        }
+                        boolean b = receivedDataQueue.offer(receivedData);
+                        if (b) {
+                            log.info("Successfully put the received data into 
the queue: {}", receivedData);
+                        } else {
+                            log.error("Failed to put the received data into 
the queue: {}", receivedData);
+                        }
+                    }
+                } else {
+                    log.error("Unexpected response received. Record: 
timestamp={}, offset={}. Response: code={} header={}, body={}",
+                        timestamp,
+                        offset,
+                        res.statusCode(),
+                        res.headers(),
+                        res.body().toString()
+                    );
+                }
+            })
+            .onFailure(err -> {
+                Long timestamp = record.getTimestamp();
+                Map<String, ?> offset = 
record.getPosition().getOffset().getOffset();
+                log.error("Request failed to send. Record: timestamp={}, 
offset={}", timestamp, offset, err);

Review Comment:
   Retry? But you seem to mention in 
https://github.com/apache/eventmesh/pull/4837#issuecomment-2061431376 that the 
retries have to be done in Runtime, so I didn't complete the retry function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org
For additional commands, e-mail: issues-h...@eventmesh.apache.org

Reply via email to