cnzakii commented on code in PR #4837: URL: https://github.com/apache/eventmesh/pull/4837#discussion_r1581999255
########## eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java: ########## @@ -114,78 +100,70 @@ private void doInitWebClient() { } + /** + * Handles a ConnectRecord by sending it asynchronously to all configured URLs. + * + * @param record the ConnectRecord to handle + */ + @Override + public void multiHandle(ConnectRecord record) { + for (URI url : this.urls) { + // convert ConnectRecord to HttpConnectRecord + String type = String.format("%s.%s.%s", connectorConfig.getConnectorName(), url.getScheme(), "common"); + HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type); + handle(url, httpConnectRecord); + } + } + + + /** + * Sends the HttpConnectRecord to the specified URL using WebClient. + * + * @param url the URL to send the HttpConnectRecord + * @param httpConnectRecord the HttpConnectRecord to send + * @return the Future of the HTTP request + */ @Override - public void handle(ConnectRecord record) { + public Future<HttpResponse<Buffer>> handle(URI url, HttpConnectRecord httpConnectRecord) { // create headers MultiMap headers = HttpHeaders.headers() + .set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8") .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8"); - // convert ConnectRecord to HttpConnectRecord - HttpConnectRecord httpConnectRecord = convertToHttpConnectRecord(record); + // get timestamp and offset + Long timestamp = httpConnectRecord.getData().getTimestamp(); + Map<String, ?> offset = httpConnectRecord.getData().getPosition().getOffset().getOffset(); // send the request - this.webClient.post(this.connectorConfig.getPath()) + return this.webClient.post(url.getPath()) + .host(url.getHost()) + .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort()) .putHeaders(headers) + .ssl(Objects.equals(url.getScheme(), "https")) .sendJson(httpConnectRecord) .onSuccess(res -> { - Long timestamp = record.getTimestamp(); - Map<String, ?> offset = record.getPosition().getOffset().getOffset(); - log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, offset); - // Determine whether the status code is 200 - if (res.statusCode() == HttpResponseStatus.OK.code()) { - // store the received data, when webhook is enabled - if (this.connectorConfig.isWebhook()) { - String dataStr = res.body().toString(); - if (dataStr.isEmpty()) { - log.warn("Received data is empty."); - return; - } - JSONObject receivedData = JSON.parseObject(dataStr); - if (receivedDataQueue.size() == Integer.MAX_VALUE) { - // if the queue is full, remove the oldest element - JSONObject removedData = receivedDataQueue.poll(); - log.info("The queue is full, remove the oldest element: {}", removedData); - } - boolean b = receivedDataQueue.offer(receivedData); - if (b) { - log.info("Successfully put the received data into the queue: {}", receivedData); - } else { - log.error("Failed to put the received data into the queue: {}", receivedData); - } + if (log.isDebugEnabled()) { + log.debug("Request sent successfully. Record: timestamp={}, offset={}", timestamp, offset); + } else { + log.info("Request sent successfully."); Review Comment: OK. I will make the modification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org For additional commands, e-mail: issues-h...@eventmesh.apache.org