cnzakii commented on code in PR #4837:
URL: https://github.com/apache/eventmesh/pull/4837#discussion_r1581999307


##########
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java:
##########
@@ -114,78 +100,70 @@ private void doInitWebClient() {
     }
 
 
+    /**
+     * Handles a ConnectRecord by sending it asynchronously to all configured 
URLs.
+     *
+     * @param record the ConnectRecord to handle
+     */
+    @Override
+    public void multiHandle(ConnectRecord record) {
+        for (URI url : this.urls) {
+            // convert ConnectRecord to HttpConnectRecord
+            String type = String.format("%s.%s.%s", 
connectorConfig.getConnectorName(), url.getScheme(), "common");
+            HttpConnectRecord httpConnectRecord = 
HttpConnectRecord.convertConnectRecord(record, type);
+            handle(url, httpConnectRecord);
+        }
+    }
+
+
+    /**
+     * Sends the HttpConnectRecord to the specified URL using WebClient.
+     *
+     * @param url               the URL to send the HttpConnectRecord
+     * @param httpConnectRecord the HttpConnectRecord to send
+     * @return the Future of the HTTP request
+     */
     @Override
-    public void handle(ConnectRecord record) {
+    public Future<HttpResponse<Buffer>> handle(URI url, HttpConnectRecord 
httpConnectRecord) {
         // create headers
         MultiMap headers = HttpHeaders.headers()
+            .set(HttpHeaderNames.CONTENT_TYPE, "application/json; 
charset=utf-8")
             .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");
 
-        // convert ConnectRecord to HttpConnectRecord
-        HttpConnectRecord httpConnectRecord = 
convertToHttpConnectRecord(record);
+        // get timestamp and offset
+        Long timestamp = httpConnectRecord.getData().getTimestamp();
+        Map<String, ?> offset = 
httpConnectRecord.getData().getPosition().getOffset().getOffset();
 
         // send the request
-        this.webClient.post(this.connectorConfig.getPath())
+        return this.webClient.post(url.getPath())
+            .host(url.getHost())
+            .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), 
"https") ? 443 : 80) : url.getPort())
             .putHeaders(headers)
+            .ssl(Objects.equals(url.getScheme(), "https"))
             .sendJson(httpConnectRecord)
             .onSuccess(res -> {
-                Long timestamp = record.getTimestamp();
-                Map<String, ?> offset = 
record.getPosition().getOffset().getOffset();
-                log.info("Request sent successfully. Record: timestamp={}, 
offset={}", timestamp, offset);
-                // Determine whether the status code is 200
-                if (res.statusCode() == HttpResponseStatus.OK.code()) {
-                    // store the received data, when webhook is enabled
-                    if (this.connectorConfig.isWebhook()) {
-                        String dataStr = res.body().toString();
-                        if (dataStr.isEmpty()) {
-                            log.warn("Received data is empty.");
-                            return;
-                        }
-                        JSONObject receivedData = JSON.parseObject(dataStr);
-                        if (receivedDataQueue.size() == Integer.MAX_VALUE) {
-                            // if the queue is full, remove the oldest element
-                            JSONObject removedData = receivedDataQueue.poll();
-                            log.info("The queue is full, remove the oldest 
element: {}", removedData);
-                        }
-                        boolean b = receivedDataQueue.offer(receivedData);
-                        if (b) {
-                            log.info("Successfully put the received data into 
the queue: {}", receivedData);
-                        } else {
-                            log.error("Failed to put the received data into 
the queue: {}", receivedData);
-                        }
+                if (log.isDebugEnabled()) {
+                    log.debug("Request sent successfully. Record: 
timestamp={}, offset={}", timestamp, offset);
+                } else {
+                    log.info("Request sent successfully.");
+                }
+                // log the response
+                if (HttpUtils.is2xxSuccessful(res.statusCode())) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Received successful response: 
statusCode={}, responseBody={}", res.statusCode(), res.bodyAsString());
                     }
                 } else {
-                    log.error("Unexpected response received. Record: 
timestamp={}, offset={}. Response: code={} header={}, body={}",
-                        timestamp,
-                        offset,
-                        res.statusCode(),
-                        res.headers(),
-                        res.body().toString()
-                    );
+                    log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}, offset={}", res.statusCode(), timestamp, offset);
                 }

Review Comment:
   OK. I will make the modification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org
For additional commands, e-mail: issues-h...@eventmesh.apache.org

Reply via email to