Pil0tXia commented on code in PR #4837:
URL: https://github.com/apache/eventmesh/pull/4837#discussion_r1581861139


##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.config;
+
+import lombok.Data;
+
+@Data
+public class HttpRetryConfig {
+    // maximum number of attempts to retry, default 3, if set to 0 or 1, no 
retry
+    private int maxAttempts = 3;

Review Comment:
   How about retrying for 1 time when it's set to `1`?



##########
eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml:
##########
@@ -26,13 +26,20 @@ pubSubConfig:
   passWord: httpPassWord
 connectorConfig:
   connectorName: httpSink
-  host: 127.0.0.1
-  port: 8987
-  path: /test
+  urls:
+   - http://127.0.0.1:8987/test
   ssl: false
-  webhook: false
   keepAlive: true
   keepAliveTimeout: 60000
   idleTimeout: 5000   # timeunit: ms, recommended scope: common(5s - 10s), 
webhook(15s - 60s)
   connectionTimeout: 5000   # timeunit: ms, recommended scope: 5 - 10s
-  maxConnectionPoolSize: 5
\ No newline at end of file
+  maxConnectionPoolSize: 5
+  retryConfig:
+    maxAttempts: 3
+    interval: 1000
+    retryAll: false

Review Comment:
   How about renaming `retryAll` to `retryOnNonSuccess` to convey a clearer 
meaning?



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java:
##########
@@ -61,7 +64,23 @@ public void init(ConnectorContext connectorContext) throws 
Exception {
 
     @SneakyThrows
     private void doInit() {
-        this.sinkHandler = new 
CommonHttpSinkHandler(this.httpSinkConfig.connectorConfig);
+        // Fill default values if absent
+        
SinkConnectorConfig.populateFieldsWithDefaults(this.httpSinkConfig.connectorConfig);
+        // Create different handlers for different configurations
+        HttpSinkHandler sinkHandler0;
+        if 
(this.httpSinkConfig.connectorConfig.getWebhookConfig().isActivate()) {
+            sinkHandler0 = new 
WebhookHttpSinkHandler(this.httpSinkConfig.connectorConfig);
+        } else {
+            sinkHandler0 = new 
CommonHttpSinkHandler(this.httpSinkConfig.connectorConfig);

Review Comment:
   Adding numbers to the end of variable names is not typically a best 
practice. How about `nonRetryHandler` or somewhat else here?



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java:
##########
@@ -114,78 +100,70 @@ private void doInitWebClient() {
     }
 
 
+    /**
+     * Handles a ConnectRecord by sending it asynchronously to all configured 
URLs.
+     *
+     * @param record the ConnectRecord to handle
+     */
+    @Override
+    public void multiHandle(ConnectRecord record) {
+        for (URI url : this.urls) {
+            // convert ConnectRecord to HttpConnectRecord
+            String type = String.format("%s.%s.%s", 
connectorConfig.getConnectorName(), url.getScheme(), "common");
+            HttpConnectRecord httpConnectRecord = 
HttpConnectRecord.convertConnectRecord(record, type);
+            handle(url, httpConnectRecord);
+        }
+    }
+
+
+    /**
+     * Sends the HttpConnectRecord to the specified URL using WebClient.
+     *
+     * @param url               the URL to send the HttpConnectRecord
+     * @param httpConnectRecord the HttpConnectRecord to send
+     * @return the Future of the HTTP request
+     */
     @Override
-    public void handle(ConnectRecord record) {
+    public Future<HttpResponse<Buffer>> handle(URI url, HttpConnectRecord 
httpConnectRecord) {
         // create headers
         MultiMap headers = HttpHeaders.headers()
+            .set(HttpHeaderNames.CONTENT_TYPE, "application/json; 
charset=utf-8")
             .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");

Review Comment:
   Adding a user-agent header here may be good.



##########
eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml:
##########
@@ -26,13 +26,20 @@ pubSubConfig:
   passWord: httpPassWord
 connectorConfig:
   connectorName: httpSink
-  host: 127.0.0.1
-  port: 8987
-  path: /test
+  urls:
+   - http://127.0.0.1:8987/test
   ssl: false
-  webhook: false
   keepAlive: true
   keepAliveTimeout: 60000
   idleTimeout: 5000   # timeunit: ms, recommended scope: common(5s - 10s), 
webhook(15s - 60s)
   connectionTimeout: 5000   # timeunit: ms, recommended scope: 5 - 10s
-  maxConnectionPoolSize: 5
\ No newline at end of file
+  maxConnectionPoolSize: 5
+  retryConfig:
+    maxAttempts: 3
+    interval: 1000
+    retryAll: false
+  webhookConfig:
+    activate: false
+    exportPath: /export
+    port: 8988
+    idleTimeout: 5000

Review Comment:
   I saw your `SinkConnectorConfig#populateFieldsWithDefaults()` method, which 
is a good design. Shall we unify the `connectorConfig.idleTimeout` at 
   L34 and `connectorConfig.webhookConfig.idleTimeout` here together?



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java:
##########
@@ -114,78 +100,70 @@ private void doInitWebClient() {
     }
 
 
+    /**
+     * Handles a ConnectRecord by sending it asynchronously to all configured 
URLs.
+     *
+     * @param record the ConnectRecord to handle
+     */
+    @Override
+    public void multiHandle(ConnectRecord record) {
+        for (URI url : this.urls) {
+            // convert ConnectRecord to HttpConnectRecord
+            String type = String.format("%s.%s.%s", 
connectorConfig.getConnectorName(), url.getScheme(), "common");
+            HttpConnectRecord httpConnectRecord = 
HttpConnectRecord.convertConnectRecord(record, type);
+            handle(url, httpConnectRecord);
+        }
+    }
+
+
+    /**
+     * Sends the HttpConnectRecord to the specified URL using WebClient.
+     *
+     * @param url               the URL to send the HttpConnectRecord
+     * @param httpConnectRecord the HttpConnectRecord to send
+     * @return the Future of the HTTP request
+     */
     @Override
-    public void handle(ConnectRecord record) {
+    public Future<HttpResponse<Buffer>> handle(URI url, HttpConnectRecord 
httpConnectRecord) {
         // create headers
         MultiMap headers = HttpHeaders.headers()
+            .set(HttpHeaderNames.CONTENT_TYPE, "application/json; 
charset=utf-8")
             .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");
 
-        // convert ConnectRecord to HttpConnectRecord
-        HttpConnectRecord httpConnectRecord = 
convertToHttpConnectRecord(record);
+        // get timestamp and offset
+        Long timestamp = httpConnectRecord.getData().getTimestamp();
+        Map<String, ?> offset = 
httpConnectRecord.getData().getPosition().getOffset().getOffset();
 
         // send the request
-        this.webClient.post(this.connectorConfig.getPath())
+        return this.webClient.post(url.getPath())
+            .host(url.getHost())
+            .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), 
"https") ? 443 : 80) : url.getPort())
             .putHeaders(headers)
+            .ssl(Objects.equals(url.getScheme(), "https"))
             .sendJson(httpConnectRecord)
             .onSuccess(res -> {
-                Long timestamp = record.getTimestamp();
-                Map<String, ?> offset = 
record.getPosition().getOffset().getOffset();
-                log.info("Request sent successfully. Record: timestamp={}, 
offset={}", timestamp, offset);
-                // Determine whether the status code is 200
-                if (res.statusCode() == HttpResponseStatus.OK.code()) {
-                    // store the received data, when webhook is enabled
-                    if (this.connectorConfig.isWebhook()) {
-                        String dataStr = res.body().toString();
-                        if (dataStr.isEmpty()) {
-                            log.warn("Received data is empty.");
-                            return;
-                        }
-                        JSONObject receivedData = JSON.parseObject(dataStr);
-                        if (receivedDataQueue.size() == Integer.MAX_VALUE) {
-                            // if the queue is full, remove the oldest element
-                            JSONObject removedData = receivedDataQueue.poll();
-                            log.info("The queue is full, remove the oldest 
element: {}", removedData);
-                        }
-                        boolean b = receivedDataQueue.offer(receivedData);
-                        if (b) {
-                            log.info("Successfully put the received data into 
the queue: {}", receivedData);
-                        } else {
-                            log.error("Failed to put the received data into 
the queue: {}", receivedData);
-                        }
+                if (log.isDebugEnabled()) {
+                    log.debug("Request sent successfully. Record: 
timestamp={}, offset={}", timestamp, offset);
+                } else {
+                    log.info("Request sent successfully.");
+                }
+                // log the response
+                if (HttpUtils.is2xxSuccessful(res.statusCode())) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Received successful response: 
statusCode={}, responseBody={}", res.statusCode(), res.bodyAsString());
                     }
                 } else {
-                    log.error("Unexpected response received. Record: 
timestamp={}, offset={}. Response: code={} header={}, body={}",
-                        timestamp,
-                        offset,
-                        res.statusCode(),
-                        res.headers(),
-                        res.body().toString()
-                    );
+                    log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset);
                 }

Review Comment:
   We need to output detailed logs under `debug` level and brief ones under 
`info` level.
   
   ```java
                   if (HttpUtils.is2xxSuccessful(res.statusCode())) {
                       if (log.isDebugEnabled()) {
                           log.debug("Received successful response: 
statusCode={}, responseBody={}", res.statusCode(), res.bodyAsString());
                       } else {
                           log.info("Received successful response: 
statusCode={}", res.statusCode());
                       }
                   } else {
                       if (log.isDebugEnabled()) {
                           log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}, offset={}, responseBody={}", res.statusCode(), timestamp, 
offset, res.bodyAsString());
                       } else {
                           log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset);
                       }
                   }
   ```



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handle;
+
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.connector.http.util.HttpUtils;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.net.URI;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.handler.LoggerHandler;
+
+import com.alibaba.fastjson2.JSONObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Extends CommonHttpSinkHandler to provide additional functionality for 
handling webhook features, including sending requests to callback servers,
+ * allowing longer response wait times, storing responses returned from 
callback servers, and exposing received data through an HTTP service.
+ */
+@Slf4j
+public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
+
+    // the configuration for webhook
+    private final HttpWebhookConfig webhookConfig;
+
+    // the server for exporting the received data
+    private HttpServer exportServer;
+
+    // store the received data, when webhook is enabled
+    private final BlockingQueue<Object> receivedDataQueue;
+
+    public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+        super(sinkConnectorConfig);
+        this.webhookConfig = sinkConnectorConfig.getWebhookConfig();
+        this.receivedDataQueue = new LinkedBlockingQueue<>();
+        // init the export server
+        doInitExportServer();
+    }
+
+    /**
+     * Initialize the server for exporting the received data
+     */
+    private void doInitExportServer() {
+        final Vertx vertx = Vertx.vertx();
+        final Router router = Router.router(vertx);
+        // add logger handler
+        router.route().handler(LoggerHandler.create());
+        // add export handler
+        router.route()
+            .path(this.webhookConfig.getExportPath())
+            .method(HttpMethod.GET)
+            .produces("application/json")
+            .handler(ctx -> {
+                // get received data
+                Object data = this.receivedDataQueue.poll();
+                if (data != null) {
+
+                    // export the received data
+                    ctx.response()
+                        .putHeader(HttpHeaders.CONTENT_TYPE, 
"application/json; charset=utf-8")
+                        .setStatusCode(HttpResponseStatus.OK.code())
+                        .send(JSONObject.of("data", data).toJSONString());
+                    if (log.isDebugEnabled()) {
+                        log.debug("Succeed to export callback data. Data: {}", 
data);
+                    } else {
+                        log.info("Succeed to export callback data.");
+                    }
+                } else {
+                    // no data to export
+                    ctx.response()
+                        .putHeader(HttpHeaders.CONTENT_TYPE, 
"application/json; charset=utf-8")
+                        .setStatusCode(HttpResponseStatus.NO_CONTENT.code())
+                        .end();
+                    log.info("No callback data to export.");
+                }
+            });
+        // create the export server
+        this.exportServer = vertx.createHttpServer(new HttpServerOptions()
+            .setPort(this.webhookConfig.getPort())
+            .setIdleTimeout(this.webhookConfig.getIdleTimeout())
+            .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)).requestHandler(router);
+    }
+
+    /**
+     * Starts the HTTP/HTTPS handler by creating a WebClient with configured 
options and starting the export server.
+     */
+    @Override
+    public void start() {
+        // start the webclient
+        super.start();
+        // start the export server
+        Throwable t = this.exportServer.listen().cause();
+        if (t != null) {
+            throw new EventMeshException("Failed to start Vertx server. ", t);
+        }
+    }
+
+    /**
+     * Processes the ConnectRecord multiple times by sending it over HTTP or 
HTTPS to all configured URLs.
+     *
+     * @param record the ConnectRecord to handle
+     */
+    @Override
+    public void multiHandle(ConnectRecord record) {
+        for (URI url : super.getUrls()) {
+            // convert ConnectRecord to HttpConnectRecord
+            String type = String.format("%s.%s.%s", 
this.getConnectorConfig().getConnectorName(), url.getScheme(), "webhook");
+            HttpConnectRecord httpConnectRecord = 
HttpConnectRecord.convertConnectRecord(record, type);
+            // handle the HttpConnectRecord
+            handle(url, httpConnectRecord);
+        }
+    }
+
+    /**
+     * Processes the ConnectRecord once by sending it over HTTP or HTTPS to 
the specified URL. If the status code is 2xx, the received data will be
+     * stored in the queue.
+     *
+     * @param url               the URL to send the ConnectRecord to
+     * @param httpConnectRecord the ConnectRecord to handle
+     * @return the Future of the HTTP request
+     */
+    @Override
+    public Future<HttpResponse<Buffer>> handle(URI url, HttpConnectRecord 
httpConnectRecord) {
+        // send the request
+        Future<HttpResponse<Buffer>> responseFuture = super.handle(url, 
httpConnectRecord);
+        // store the received data
+        return responseFuture.onSuccess(res -> {
+            // Determine whether the status code is 2xx
+            if (!HttpUtils.is2xxSuccessful(res.statusCode())) {
+                return;
+            }
+            // Get the received data
+            String receivedData = res.bodyAsString();
+            if (receivedData.isEmpty()) {
+                log.warn("Received data is empty.");
+                return;
+            }
+            // If the queue is full, remove the oldest element
+            if (receivedDataQueue.size() == Integer.MAX_VALUE) {

Review Comment:
   `Integer.MAX_VALUE` is too large, this will definitely blow up connector's 
memory.



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java:
##########
@@ -114,78 +100,70 @@ private void doInitWebClient() {
     }
 
 
+    /**
+     * Handles a ConnectRecord by sending it asynchronously to all configured 
URLs.
+     *
+     * @param record the ConnectRecord to handle
+     */
+    @Override
+    public void multiHandle(ConnectRecord record) {
+        for (URI url : this.urls) {
+            // convert ConnectRecord to HttpConnectRecord
+            String type = String.format("%s.%s.%s", 
connectorConfig.getConnectorName(), url.getScheme(), "common");
+            HttpConnectRecord httpConnectRecord = 
HttpConnectRecord.convertConnectRecord(record, type);
+            handle(url, httpConnectRecord);
+        }
+    }
+
+
+    /**
+     * Sends the HttpConnectRecord to the specified URL using WebClient.
+     *
+     * @param url               the URL to send the HttpConnectRecord
+     * @param httpConnectRecord the HttpConnectRecord to send
+     * @return the Future of the HTTP request
+     */
     @Override
-    public void handle(ConnectRecord record) {
+    public Future<HttpResponse<Buffer>> handle(URI url, HttpConnectRecord 
httpConnectRecord) {
         // create headers
         MultiMap headers = HttpHeaders.headers()
+            .set(HttpHeaderNames.CONTENT_TYPE, "application/json; 
charset=utf-8")
             .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");
 
-        // convert ConnectRecord to HttpConnectRecord
-        HttpConnectRecord httpConnectRecord = 
convertToHttpConnectRecord(record);
+        // get timestamp and offset
+        Long timestamp = httpConnectRecord.getData().getTimestamp();
+        Map<String, ?> offset = 
httpConnectRecord.getData().getPosition().getOffset().getOffset();
 
         // send the request
-        this.webClient.post(this.connectorConfig.getPath())
+        return this.webClient.post(url.getPath())
+            .host(url.getHost())
+            .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), 
"https") ? 443 : 80) : url.getPort())
             .putHeaders(headers)
+            .ssl(Objects.equals(url.getScheme(), "https"))
             .sendJson(httpConnectRecord)
             .onSuccess(res -> {
-                Long timestamp = record.getTimestamp();
-                Map<String, ?> offset = 
record.getPosition().getOffset().getOffset();
-                log.info("Request sent successfully. Record: timestamp={}, 
offset={}", timestamp, offset);
-                // Determine whether the status code is 200
-                if (res.statusCode() == HttpResponseStatus.OK.code()) {
-                    // store the received data, when webhook is enabled
-                    if (this.connectorConfig.isWebhook()) {
-                        String dataStr = res.body().toString();
-                        if (dataStr.isEmpty()) {
-                            log.warn("Received data is empty.");
-                            return;
-                        }
-                        JSONObject receivedData = JSON.parseObject(dataStr);
-                        if (receivedDataQueue.size() == Integer.MAX_VALUE) {
-                            // if the queue is full, remove the oldest element
-                            JSONObject removedData = receivedDataQueue.poll();
-                            log.info("The queue is full, remove the oldest 
element: {}", removedData);
-                        }
-                        boolean b = receivedDataQueue.offer(receivedData);
-                        if (b) {
-                            log.info("Successfully put the received data into 
the queue: {}", receivedData);
-                        } else {
-                            log.error("Failed to put the received data into 
the queue: {}", receivedData);
-                        }
+                if (log.isDebugEnabled()) {
+                    log.debug("Request sent successfully. Record: 
timestamp={}, offset={}", timestamp, offset);
+                } else {
+                    log.info("Request sent successfully.");

Review Comment:
   timestamp and offset don't have serialization overhead and can be seen in 
one line, so `log.info("Request sent successfully. Record: timestamp={}, 
offset={}", timestamp, offset)` is enough here~



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java:
##########
@@ -17,27 +17,57 @@
 
 package org.apache.eventmesh.connector.http.sink.handle;
 
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
+import java.net.URI;
+
+import io.vertx.core.Future;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+
 /**
- * Any class that needs to process ConnectRecord via HTTP needs to implement 
this interface.
+ * Interface for handling ConnectRecords via HTTP or HTTPS. Classes 
implementing this interface are responsible for processing ConnectRecords by
+ * sending them over HTTP or HTTPS, with additional support for handling 
multiple requests and asynchronous processing.
+ *
+ * <p>Any class that needs to process ConnectRecords via HTTP or HTTPS should 
implement this interface.
+ * Implementing classes must provide implementations for the {@link #start()}, 
{@link #multiHandle(ConnectRecord)},
+ * {@link #handle(URI, HttpConnectRecord)}, and {@link #stop()} methods.</p>
+ *
+ * <p>Implementing classes should ensure thread safety and handle HTTP/HTTPS 
communication efficiently.
+ * The {@link #start()} method initializes any necessary resources for 
HTTP/HTTPS communication.
+ * The {@link #multiHandle(ConnectRecord)} method processes a ConnectRecord 
multiple times by sending it over HTTP or HTTPS.
+ * The {@link #handle(URI, HttpConnectRecord)} method processes a single 
ConnectRecord by sending it over HTTP or HTTPS to the specified URL.
+ * The {@link #stop()} method releases any resources used for HTTP/HTTPS 
communication.</p>
+ *
+ * <p>It's recommended to handle exceptions gracefully within the {@link 
#handle(URI, HttpConnectRecord)} method
+ * to prevent message loss or processing interruptions.</p>
  */
 public interface HttpSinkHandler {
 
     /**
-     * start the handler
+     * Initializes the HTTP/HTTPS handler. This method should be called before 
using the handler.
      */
     void start();
 
     /**
-     * Handle the ConnectRecord.
+     * Processes the ConnectRecord multiple times.
      *
      * @param record the ConnectRecord to handle
      */
-    void handle(ConnectRecord record);
+    void multiHandle(ConnectRecord record);

Review Comment:
   Your Javadoc is very detailed, and I first read the Javadoc when reviewing. 
However, I still need to check the implementation of the `multiHandle()` method 
to be sure that `handle()` is called once for each element in the `urlList`.
   
   Personally, if the `multiHandle()` method is named `handleUrlList()` or 
`handleList()`, perhaps the call relationship between the two can be seen 
directly from the method name. However, this is just my personal preference, 
`multiHandle()` as a more abstract name, is also a good choice.



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handle;
+
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.connector.http.util.HttpUtils;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.net.URI;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.handler.LoggerHandler;
+
+import com.alibaba.fastjson2.JSONObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Extends CommonHttpSinkHandler to provide additional functionality for 
handling webhook features, including sending requests to callback servers,
+ * allowing longer response wait times, storing responses returned from 
callback servers, and exposing received data through an HTTP service.
+ */
+@Slf4j
+public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
+
+    // the configuration for webhook
+    private final HttpWebhookConfig webhookConfig;
+
+    // the server for exporting the received data
+    private HttpServer exportServer;
+
+    // store the received data, when webhook is enabled
+    private final BlockingQueue<Object> receivedDataQueue;
+
+    public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+        super(sinkConnectorConfig);
+        this.webhookConfig = sinkConnectorConfig.getWebhookConfig();
+        this.receivedDataQueue = new LinkedBlockingQueue<>();
+        // init the export server
+        doInitExportServer();
+    }
+
+    /**
+     * Initialize the server for exporting the received data
+     */
+    private void doInitExportServer() {
+        final Vertx vertx = Vertx.vertx();
+        final Router router = Router.router(vertx);
+        // add logger handler
+        router.route().handler(LoggerHandler.create());
+        // add export handler
+        router.route()
+            .path(this.webhookConfig.getExportPath())
+            .method(HttpMethod.GET)
+            .produces("application/json")
+            .handler(ctx -> {
+                // get received data
+                Object data = this.receivedDataQueue.poll();
+                if (data != null) {
+
+                    // export the received data
+                    ctx.response()
+                        .putHeader(HttpHeaders.CONTENT_TYPE, 
"application/json; charset=utf-8")
+                        .setStatusCode(HttpResponseStatus.OK.code())
+                        .send(JSONObject.of("data", data).toJSONString());
+                    if (log.isDebugEnabled()) {
+                        log.debug("Succeed to export callback data. Data: {}", 
data);
+                    } else {
+                        log.info("Succeed to export callback data.");
+                    }
+                } else {
+                    // no data to export
+                    ctx.response()
+                        .putHeader(HttpHeaders.CONTENT_TYPE, 
"application/json; charset=utf-8")
+                        .setStatusCode(HttpResponseStatus.NO_CONTENT.code())
+                        .end();
+                    log.info("No callback data to export.");
+                }
+            });
+        // create the export server
+        this.exportServer = vertx.createHttpServer(new HttpServerOptions()
+            .setPort(this.webhookConfig.getPort())
+            .setIdleTimeout(this.webhookConfig.getIdleTimeout())
+            .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)).requestHandler(router);
+    }
+
+    /**
+     * Starts the HTTP/HTTPS handler by creating a WebClient with configured 
options and starting the export server.
+     */
+    @Override
+    public void start() {
+        // start the webclient
+        super.start();
+        // start the export server
+        Throwable t = this.exportServer.listen().cause();
+        if (t != null) {
+            throw new EventMeshException("Failed to start Vertx server. ", t);
+        }
+    }
+
+    /**
+     * Processes the ConnectRecord multiple times by sending it over HTTP or 
HTTPS to all configured URLs.
+     *
+     * @param record the ConnectRecord to handle
+     */
+    @Override
+    public void multiHandle(ConnectRecord record) {
+        for (URI url : super.getUrls()) {
+            // convert ConnectRecord to HttpConnectRecord
+            String type = String.format("%s.%s.%s", 
this.getConnectorConfig().getConnectorName(), url.getScheme(), "webhook");
+            HttpConnectRecord httpConnectRecord = 
HttpConnectRecord.convertConnectRecord(record, type);
+            // handle the HttpConnectRecord
+            handle(url, httpConnectRecord);
+        }
+    }
+
+    /**
+     * Processes the ConnectRecord once by sending it over HTTP or HTTPS to 
the specified URL. If the status code is 2xx, the received data will be
+     * stored in the queue.
+     *
+     * @param url               the URL to send the ConnectRecord to
+     * @param httpConnectRecord the ConnectRecord to handle
+     * @return the Future of the HTTP request
+     */
+    @Override
+    public Future<HttpResponse<Buffer>> handle(URI url, HttpConnectRecord 
httpConnectRecord) {
+        // send the request
+        Future<HttpResponse<Buffer>> responseFuture = super.handle(url, 
httpConnectRecord);
+        // store the received data
+        return responseFuture.onSuccess(res -> {
+            // Determine whether the status code is 2xx
+            if (!HttpUtils.is2xxSuccessful(res.statusCode())) {
+                return;
+            }
+            // Get the received data
+            String receivedData = res.bodyAsString();
+            if (receivedData.isEmpty()) {
+                log.warn("Received data is empty.");
+                return;
+            }

Review Comment:
   As is said in 
https://github.com/apache/eventmesh/pull/4837#discussion_r1573319164, I think 
there's no need to output a warning when response body is empty. Because an 
empty response might be common on specific servers and will output a lot 
useless warnings in console.



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handle;
+
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.connector.http.util.HttpUtils;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.net.URI;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.handler.LoggerHandler;
+
+import com.alibaba.fastjson2.JSONObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Extends CommonHttpSinkHandler to provide additional functionality for 
handling webhook features, including sending requests to callback servers,
+ * allowing longer response wait times, storing responses returned from 
callback servers, and exposing received data through an HTTP service.
+ */
+@Slf4j
+public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
+
+    // the configuration for webhook
+    private final HttpWebhookConfig webhookConfig;
+
+    // the server for exporting the received data
+    private HttpServer exportServer;
+
+    // store the received data, when webhook is enabled
+    private final BlockingQueue<Object> receivedDataQueue;
+
+    public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+        super(sinkConnectorConfig);
+        this.webhookConfig = sinkConnectorConfig.getWebhookConfig();
+        this.receivedDataQueue = new LinkedBlockingQueue<>();
+        // init the export server
+        doInitExportServer();
+    }
+
+    /**
+     * Initialize the server for exporting the received data
+     */
+    private void doInitExportServer() {
+        final Vertx vertx = Vertx.vertx();
+        final Router router = Router.router(vertx);
+        // add logger handler
+        router.route().handler(LoggerHandler.create());
+        // add export handler
+        router.route()
+            .path(this.webhookConfig.getExportPath())
+            .method(HttpMethod.GET)
+            .produces("application/json")
+            .handler(ctx -> {
+                // get received data
+                Object data = this.receivedDataQueue.poll();
+                if (data != null) {
+
+                    // export the received data
+                    ctx.response()
+                        .putHeader(HttpHeaders.CONTENT_TYPE, 
"application/json; charset=utf-8")
+                        .setStatusCode(HttpResponseStatus.OK.code())
+                        .send(JSONObject.of("data", data).toJSONString());
+                    if (log.isDebugEnabled()) {
+                        log.debug("Succeed to export callback data. Data: {}", 
data);
+                    } else {
+                        log.info("Succeed to export callback data.");
+                    }
+                } else {
+                    // no data to export
+                    ctx.response()
+                        .putHeader(HttpHeaders.CONTENT_TYPE, 
"application/json; charset=utf-8")
+                        .setStatusCode(HttpResponseStatus.NO_CONTENT.code())
+                        .end();
+                    log.info("No callback data to export.");
+                }
+            });
+        // create the export server
+        this.exportServer = vertx.createHttpServer(new HttpServerOptions()
+            .setPort(this.webhookConfig.getPort())
+            .setIdleTimeout(this.webhookConfig.getIdleTimeout())
+            .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)).requestHandler(router);
+    }
+
+    /**
+     * Starts the HTTP/HTTPS handler by creating a WebClient with configured 
options and starting the export server.
+     */
+    @Override
+    public void start() {
+        // start the webclient
+        super.start();
+        // start the export server
+        Throwable t = this.exportServer.listen().cause();
+        if (t != null) {
+            throw new EventMeshException("Failed to start Vertx server. ", t);
+        }
+    }
+
+    /**
+     * Processes the ConnectRecord multiple times by sending it over HTTP or 
HTTPS to all configured URLs.
+     *
+     * @param record the ConnectRecord to handle
+     */
+    @Override
+    public void multiHandle(ConnectRecord record) {
+        for (URI url : super.getUrls()) {
+            // convert ConnectRecord to HttpConnectRecord
+            String type = String.format("%s.%s.%s", 
this.getConnectorConfig().getConnectorName(), url.getScheme(), "webhook");
+            HttpConnectRecord httpConnectRecord = 
HttpConnectRecord.convertConnectRecord(record, type);
+            // handle the HttpConnectRecord
+            handle(url, httpConnectRecord);
+        }
+    }
+
+    /**
+     * Processes the ConnectRecord once by sending it over HTTP or HTTPS to 
the specified URL. If the status code is 2xx, the received data will be
+     * stored in the queue.
+     *
+     * @param url               the URL to send the ConnectRecord to
+     * @param httpConnectRecord the ConnectRecord to handle
+     * @return the Future of the HTTP request
+     */
+    @Override
+    public Future<HttpResponse<Buffer>> handle(URI url, HttpConnectRecord 
httpConnectRecord) {
+        // send the request
+        Future<HttpResponse<Buffer>> responseFuture = super.handle(url, 
httpConnectRecord);
+        // store the received data
+        return responseFuture.onSuccess(res -> {
+            // Determine whether the status code is 2xx
+            if (!HttpUtils.is2xxSuccessful(res.statusCode())) {
+                return;
+            }
+            // Get the received data
+            String receivedData = res.bodyAsString();

Review Comment:
   For `/export` endpoint, we need to display all deliveries' result to users, 
including 2xx, non-2xx and network-failures.
   
   We might want to store something more than responseBody in 
`receivedDataQueue`, like `statusCode`, `callbackUrl` and `receivedTime`.



##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java:
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handle;
+
+
+import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.connector.http.sink.config.HttpWebhookConfig;
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.connector.http.util.HttpUtils;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.net.URI;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.handler.LoggerHandler;
+
+import com.alibaba.fastjson2.JSONObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Extends CommonHttpSinkHandler to provide additional functionality for 
handling webhook features, including sending requests to callback servers,
+ * allowing longer response wait times, storing responses returned from 
callback servers, and exposing received data through an HTTP service.
+ */
+@Slf4j
+public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
+
+    // the configuration for webhook
+    private final HttpWebhookConfig webhookConfig;
+
+    // the server for exporting the received data
+    private HttpServer exportServer;
+
+    // store the received data, when webhook is enabled
+    private final BlockingQueue<Object> receivedDataQueue;
+
+    public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
+        super(sinkConnectorConfig);
+        this.webhookConfig = sinkConnectorConfig.getWebhookConfig();
+        this.receivedDataQueue = new LinkedBlockingQueue<>();
+        // init the export server
+        doInitExportServer();
+    }
+
+    /**
+     * Initialize the server for exporting the received data
+     */
+    private void doInitExportServer() {
+        final Vertx vertx = Vertx.vertx();
+        final Router router = Router.router(vertx);
+        // add logger handler
+        router.route().handler(LoggerHandler.create());
+        // add export handler
+        router.route()
+            .path(this.webhookConfig.getExportPath())
+            .method(HttpMethod.GET)
+            .produces("application/json")
+            .handler(ctx -> {
+                // get received data
+                Object data = this.receivedDataQueue.poll();
+                if (data != null) {
+
+                    // export the received data
+                    ctx.response()
+                        .putHeader(HttpHeaders.CONTENT_TYPE, 
"application/json; charset=utf-8")
+                        .setStatusCode(HttpResponseStatus.OK.code())
+                        .send(JSONObject.of("data", data).toJSONString());
+                    if (log.isDebugEnabled()) {
+                        log.debug("Succeed to export callback data. Data: {}", 
data);

Review Comment:
   How about returning multiple data in batches at once? That is, pagination, 
which can reduce the number of times the endpoint is called.
   
   It is necessary to remind users in the `/export` document that the request 
to the endpoint is not idempotent, and the data stored on the server will be 
deleted after the request. This may be a bit inconvenient. It would be better 
if there is a GET path parameter, the default value of which is not to delete 
data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org
For additional commands, e-mail: issues-h...@eventmesh.apache.org


Reply via email to