This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cb8df531 [ISSUE #5077] HTTP Sink Connector supports result callback 
(#5078)
8cb8df531 is described below

commit 8cb8df531774fb017851bd5d2bae908a5098bdb2
Author: Zaki <[email protected]>
AuthorDate: Fri Aug 16 17:49:10 2024 +0800

    [ISSUE #5077] HTTP Sink Connector supports result callback (#5078)
    
    * feat: Support CallBack for ConnectRecord
    
    * doc: Improve some documentation
    
    * feat: Support for multi-server data callbacks
    
    * perf: Optimize some logic
---
 .../http/common/SynchronizedCircularFifoQueue.java |   3 +-
 .../connector/http/sink/HttpSinkConnector.java     |  10 +-
 .../http/sink/config/HttpRetryConfig.java          |   4 +-
 .../http/sink/data/HttpConnectRecord.java          |  44 ++++-
 .../http/sink/data/HttpExportMetadata.java         |  10 +-
 .../connector/http/sink/data/HttpExportRecord.java |   6 +-
 .../http/sink/data/HttpExportRecordPage.java       |   5 +-
 .../connector/http/sink/data/HttpRetryEvent.java   |  80 ++++++++
 .../http/sink/data/MultiHttpRequestContext.java    |  64 +++++++
 .../http/sink/handle/RetryHttpSinkHandler.java     | 206 ---------------------
 .../http/sink/handler/AbstractHttpSinkHandler.java |  88 +++++++++
 .../sink/{handle => handler}/HttpSinkHandler.java  |  14 +-
 .../impl}/CommonHttpSinkHandler.java               | 200 +++++++++++++-------
 .../handler/impl/HttpSinkHandlerRetryWrapper.java  | 120 ++++++++++++
 .../impl}/WebhookHttpSinkHandler.java              |  91 +++++----
 .../connector/http/sink/HttpSinkConnectorTest.java |   2 +-
 16 files changed, 603 insertions(+), 344 deletions(-)

diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
index 439a9f3d7..0564e5873 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/common/SynchronizedCircularFifoQueue.java
@@ -120,9 +120,10 @@ public class SynchronizedCircularFifoQueue<E> extends 
CircularFifoQueue<E> {
      */
     public synchronized List<E> fetchRange(int start, int end, boolean 
removed) {
 
-        if (start < 0 || end > this.size() || start > end) {
+        if (start < 0 || start > end) {
             throw new IllegalArgumentException("Invalid range");
         }
+        end = Math.min(end, this.size());
 
         Iterator<E> iterator = this.iterator();
         List<E> items = new ArrayList<>(end - start);
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
index 8a1475637..9b6038bde 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java
@@ -20,10 +20,10 @@ package org.apache.eventmesh.connector.http.sink;
 import org.apache.eventmesh.common.config.connector.Config;
 import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig;
 import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
-import org.apache.eventmesh.connector.http.sink.handle.CommonHttpSinkHandler;
-import org.apache.eventmesh.connector.http.sink.handle.HttpSinkHandler;
-import org.apache.eventmesh.connector.http.sink.handle.RetryHttpSinkHandler;
-import org.apache.eventmesh.connector.http.sink.handle.WebhookHttpSinkHandler;
+import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler;
+import 
org.apache.eventmesh.connector.http.sink.handler.impl.CommonHttpSinkHandler;
+import 
org.apache.eventmesh.connector.http.sink.handler.impl.HttpSinkHandlerRetryWrapper;
+import 
org.apache.eventmesh.connector.http.sink.handler.impl.WebhookHttpSinkHandler;
 import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
 import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
 import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
@@ -86,7 +86,7 @@ public class HttpSinkConnector implements Sink, 
ConnectorCreateService<Sink> {
             this.sinkHandler = nonRetryHandler;
         } else if (maxRetries > 0) {
             // Wrap the sink handler with a retry handler
-            this.sinkHandler = new 
RetryHttpSinkHandler(this.httpSinkConfig.connectorConfig, nonRetryHandler);
+            this.sinkHandler = new 
HttpSinkHandlerRetryWrapper(this.httpSinkConfig.connectorConfig, 
nonRetryHandler);
         } else {
             throw new IllegalArgumentException("Max retries must be greater 
than or equal to 0.");
         }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java
index 0bceac7d4..08c3a323e 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/config/HttpRetryConfig.java
@@ -24,8 +24,8 @@ public class HttpRetryConfig {
     // maximum number of retries, default 2, minimum 0
     private int maxRetries = 2;
 
-    // retry interval, default 2000ms
-    private int interval = 2000;
+    // retry interval, default 1000ms
+    private int interval = 1000;
 
     // Default value is false, indicating that only requests with 
network-level errors will be retried.
     // If set to true, all failed requests will be retried, including 
network-level errors and non-2xx responses.
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
index a258c6ab5..95b40afe9 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpConnectRecord.java
@@ -20,31 +20,60 @@ package org.apache.eventmesh.connector.http.sink.data;
 import org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
+import java.io.Serializable;
 import java.time.LocalDateTime;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
 import lombok.Builder;
-import lombok.Data;
+import lombok.Getter;
 
 /**
  * a special ConnectRecord for HttpSinkConnector
  */
-@Data
+@Getter
 @Builder
-public class HttpConnectRecord {
+public class HttpConnectRecord implements Serializable {
 
-    private String type;
+    private static final long serialVersionUID = 5271462532332251473L;
+
+    /**
+     * The unique identifier for the HttpConnectRecord
+     */
+    private final String httpRecordId = UUID.randomUUID().toString();
 
-    private String time;
+    /**
+     * The time when the HttpConnectRecord was created
+     */
+    private LocalDateTime createTime;
 
-    private String uuid;
+    /**
+     * The type of the HttpConnectRecord
+     */
+    private String type;
 
+    /**
+     * The event id of the HttpConnectRecord
+     */
     private String eventId;
 
+    /**
+     * The ConnectRecord to be sent
+     */
     private ConnectRecord data;
 
+    @Override
+    public String toString() {
+        return "HttpConnectRecord{"
+            + "createTime=" + createTime
+            + ", httpRecordId='" + httpRecordId
+            + ", type='" + type
+            + ", eventId='" + eventId
+            + ", data=" + data
+            + '}';
+    }
+
     /**
      * Convert ConnectRecord to HttpConnectRecord
      *
@@ -62,11 +91,8 @@ public class HttpConnectRecord {
         }
         return HttpConnectRecord.builder()
             .type(type)
-            .time(LocalDateTime.now().toString())
-            .uuid(UUID.randomUUID().toString())
             .eventId(type + "-" + offset)
             .data(record)
             .build();
     }
-
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
index 848012f15..41a508787 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportMetadata.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.connector.http.sink.data;
 
+import java.io.Serializable;
 import java.time.LocalDateTime;
 
 import lombok.Builder;
@@ -27,7 +28,10 @@ import lombok.Data;
  */
 @Data
 @Builder
-public class HttpExportMetadata {
+public class HttpExportMetadata implements Serializable {
+
+    private static final long serialVersionUID = 1121010466793041920L;
+
     private String url;
 
     private int code;
@@ -36,7 +40,9 @@ public class HttpExportMetadata {
 
     private LocalDateTime receivedTime;
 
-    private String uuid;
+    private String httpRecordId;
+
+    private String recordId;
 
     private String retriedBy;
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecord.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecord.java
index b6382aee7..c6bdb0288 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecord.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecord.java
@@ -17,6 +17,8 @@
 
 package org.apache.eventmesh.connector.http.sink.data;
 
+import java.io.Serializable;
+
 import lombok.AllArgsConstructor;
 import lombok.Data;
 
@@ -25,7 +27,9 @@ import lombok.Data;
  */
 @Data
 @AllArgsConstructor
-public class HttpExportRecord {
+public class HttpExportRecord implements Serializable {
+
+    private static final long serialVersionUID = 6010283911452947157L;
 
     private HttpExportMetadata metadata;
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecordPage.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecordPage.java
index 5c44eb3b7..81e582c33 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecordPage.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpExportRecordPage.java
@@ -17,6 +17,7 @@
 
 package org.apache.eventmesh.connector.http.sink.data;
 
+import java.io.Serializable;
 import java.util.List;
 
 import lombok.AllArgsConstructor;
@@ -27,7 +28,9 @@ import lombok.Data;
  */
 @Data
 @AllArgsConstructor
-public class HttpExportRecordPage {
+public class HttpExportRecordPage implements Serializable {
+
+    private static final long serialVersionUID = 1143791658357035990L;
 
     private int pageNum;
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java
new file mode 100644
index 000000000..4b229f983
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.data;
+
+import lombok.Data;
+
+/**
+ * Single HTTP retry event
+ */
+@Data
+public class HttpRetryEvent {
+
+    public static final String PREFIX = "http-retry-event-";
+
+    private String parentId;
+
+    private int maxRetries;
+
+    private int currentRetries;
+
+    private Throwable lastException;
+
+    /**
+     * Increase the current retries by 1
+     */
+    public void increaseCurrentRetries() {
+        this.currentRetries++;
+    }
+
+    /**
+     * Check if the current retries is greater than or equal to the max retries
+     * @return true if the current retries is greater than or equal to the max 
retries
+     */
+    public boolean isMaxRetriesReached() {
+        return this.currentRetries >= this.maxRetries;
+    }
+
+    /**
+     * Get the limited exception message with the default limit of 256
+     * @return the limited exception message
+     */
+    public String getLimitedExceptionMessage() {
+        return getLimitedExceptionMessage(256);
+    }
+
+    /**
+     * Get the limited exception message with the specified limit
+     * @param maxLimit the maximum limit of the exception message
+     * @return the limited exception message
+     */
+    public String getLimitedExceptionMessage(int maxLimit) {
+        if (lastException == null) {
+            return "";
+        }
+        String message = lastException.getMessage();
+        if (message == null) {
+            return "";
+        }
+        if (message.length() > maxLimit) {
+            return message.substring(0, maxLimit);
+        }
+        return message;
+    }
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java
new file mode 100644
index 000000000..67ab94381
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.data;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * Multi HTTP request context
+ */
+public class MultiHttpRequestContext {
+
+    public static final String NAME = "multi-http-request-context";
+
+    /**
+     * The remaining requests to be processed.
+     */
+    private final AtomicInteger remainingRequests;
+
+    /**
+     * The last failed event.
+     * If there are no retries or retries are not enabled, it will be null.
+     * If retries occur but still fail, it will be logged, and only the last 
one will be retained.
+     */
+    private HttpRetryEvent lastFailedEvent;
+
+    public MultiHttpRequestContext(int remainingEvents) {
+        this.remainingRequests = new AtomicInteger(remainingEvents);
+    }
+
+    /**
+     * Decrement the remaining requests by 1.
+     */
+    public void decrementRemainingRequests() {
+        remainingRequests.decrementAndGet();
+    }
+
+    public int getRemainingRequests() {
+        return remainingRequests.get();
+    }
+
+    public HttpRetryEvent getLastFailedEvent() {
+        return lastFailedEvent;
+    }
+
+    public void setLastFailedEvent(HttpRetryEvent lastFailedEvent) {
+        this.lastFailedEvent = lastFailedEvent;
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
deleted file mode 100644
index bc2a53610..000000000
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/RetryHttpSinkHandler.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eventmesh.connector.http.sink.handle;
-
-import org.apache.eventmesh.connector.http.sink.config.HttpRetryConfig;
-import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
-import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
-import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata;
-import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord;
-import org.apache.eventmesh.connector.http.util.HttpUtils;
-import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
-
-import java.net.ConnectException;
-import java.net.URI;
-import java.time.Duration;
-import java.time.LocalDateTime;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-import io.vertx.core.Future;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.ext.web.client.HttpResponse;
-
-import lombok.extern.slf4j.Slf4j;
-
-import dev.failsafe.Failsafe;
-import dev.failsafe.RetryPolicy;
-import dev.failsafe.RetryPolicyBuilder;
-import dev.failsafe.event.ExecutionEvent;
-
-
-@Slf4j
-public class RetryHttpSinkHandler implements HttpSinkHandler {
-
-    private final SinkConnectorConfig connectorConfig;
-
-    // Retry policy builder
-    private RetryPolicyBuilder<HttpResponse<Buffer>> retryPolicyBuilder;
-
-    private final List<URI> urls;
-
-    private final HttpSinkHandler sinkHandler;
-
-
-    public RetryHttpSinkHandler(SinkConnectorConfig connectorConfig, 
HttpSinkHandler sinkHandler) {
-        this.connectorConfig = connectorConfig;
-        this.sinkHandler = sinkHandler;
-
-        // Initialize retry
-        initRetry();
-
-        // Initialize URLs
-        String[] urlStrings = connectorConfig.getUrls();
-        this.urls = Arrays.stream(urlStrings)
-            .map(URI::create)
-            .collect(Collectors.toList());
-    }
-
-    private void initRetry() {
-        HttpRetryConfig httpRetryConfig = 
this.connectorConfig.getRetryConfig();
-
-        this.retryPolicyBuilder = RetryPolicy.<HttpResponse<Buffer>>builder()
-            .handleIf(e -> e instanceof ConnectException)
-            .handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() 
&& !HttpUtils.is2xxSuccessful(response.statusCode()))
-            .withMaxRetries(httpRetryConfig.getMaxRetries())
-            .withDelay(Duration.ofMillis(httpRetryConfig.getInterval()));
-    }
-
-
-    /**
-     * Initializes the WebClient for making HTTP requests based on the 
provided SinkConnectorConfig.
-     */
-    @Override
-    public void start() {
-        sinkHandler.start();
-    }
-
-
-    /**
-     * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method 
should be called for each ConnectRecord that needs to be processed.
-     *
-     * @param record the ConnectRecord to process
-     */
-    @Override
-    public void handle(ConnectRecord record) {
-        for (URI url : this.urls) {
-            // convert ConnectRecord to HttpConnectRecord
-            String type = String.format("%s.%s.%s",
-                this.connectorConfig.getConnectorName(), url.getScheme(),
-                this.connectorConfig.getWebhookConfig().isActivate() ? 
"webhook" : "common");
-            HttpConnectRecord httpConnectRecord = 
HttpConnectRecord.convertConnectRecord(record, type);
-            // handle the HttpConnectRecord
-            deliver(url, httpConnectRecord);
-        }
-    }
-
-
-    /**
-     * Processes HttpConnectRecord on specified URL while returning its own 
processing logic This method provides the retry power to process the
-     * HttpConnectRecord
-     *
-     * @param url               URI to which the HttpConnectRecord should be 
sent
-     * @param httpConnectRecord HttpConnectRecord to process
-     * @return processing chain
-     */
-    @Override
-    public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord 
httpConnectRecord) {
-        // Only webhook mode needs to use the UUID to identify the request
-        String id = httpConnectRecord.getUuid();
-
-        // Build the retry policy
-        RetryPolicy<HttpResponse<Buffer>> retryPolicy = retryPolicyBuilder
-            .onSuccess(event -> {
-                if (connectorConfig.getWebhookConfig().isActivate()) {
-                    // convert the result to an HttpExportRecord
-                    HttpExportRecord exportRecord = 
covertToExportRecord(httpConnectRecord, event, event.getResult(), 
event.getException(), url, id);
-                    // add the data to the queue
-                    ((WebhookHttpSinkHandler) 
sinkHandler).getReceivedDataQueue().offer(exportRecord);
-                }
-            })
-            .onRetry(event -> {
-                if (log.isDebugEnabled()) {
-                    log.warn("Retrying the request to {} for the {} time. 
HttpConnectRecord= {}", url, event.getAttemptCount(), httpConnectRecord);
-                } else {
-                    log.warn("Retrying the request to {} for the {} time.", 
url, event.getAttemptCount());
-                }
-                if (connectorConfig.getWebhookConfig().isActivate()) {
-                    HttpExportRecord exportRecord =
-                        covertToExportRecord(httpConnectRecord, event, 
event.getLastResult(), event.getLastException(), url, id);
-                    ((WebhookHttpSinkHandler) 
sinkHandler).getReceivedDataQueue().offer(exportRecord);
-                }
-                // update the HttpConnectRecord
-                httpConnectRecord.setTime(LocalDateTime.now().toString());
-                httpConnectRecord.setUuid(UUID.randomUUID().toString());
-            })
-            .onFailure(event -> {
-                if (log.isDebugEnabled()) {
-                    log.error("Failed to send the request to {} after {} 
attempts. HttpConnectRecord= {}", url, event.getAttemptCount(),
-                        httpConnectRecord, event.getException());
-                } else {
-                    log.error("Failed to send the request to {} after {} 
attempts.", url, event.getAttemptCount(), event.getException());
-                }
-                if (connectorConfig.getWebhookConfig().isActivate()) {
-                    HttpExportRecord exportRecord = 
covertToExportRecord(httpConnectRecord, event, event.getResult(), 
event.getException(), url, id);
-                    ((WebhookHttpSinkHandler) 
sinkHandler).getReceivedDataQueue().offer(exportRecord);
-                }
-            }).build();
-
-        // Handle the HttpConnectRecord with retry
-        Failsafe.with(retryPolicy)
-            .getStageAsync(() -> sinkHandler.deliver(url, 
httpConnectRecord).toCompletionStage());
-
-        return null;
-    }
-
-    /**
-     * Converts the ExecutionCompletedEvent to an HttpExportRecord.
-     *
-     * @param httpConnectRecord HttpConnectRecord
-     * @param event             ExecutionEvent
-     * @param response          the response of the request, may be null
-     * @param e                 the exception thrown during the request, may 
be null
-     * @param url               the URL the request was sent to
-     * @param id                UUID
-     * @return the converted HttpExportRecord
-     */
-    private HttpExportRecord covertToExportRecord(HttpConnectRecord 
httpConnectRecord, ExecutionEvent event, HttpResponse<Buffer> response,
-        Throwable e, URI url, String id) {
-
-        HttpExportMetadata httpExportMetadata = HttpExportMetadata.builder()
-            .url(url.toString())
-            .code(response != null ? response.statusCode() : -1)
-            .message(response != null ? response.statusMessage() : 
e.getMessage())
-            .receivedTime(LocalDateTime.now())
-            .uuid(httpConnectRecord.getUuid())
-            .retriedBy(event.getAttemptCount() > 1 ? id : null)
-            .retryNum(event.getAttemptCount() - 1).build();
-
-        return new HttpExportRecord(httpExportMetadata, response == null ? 
null : response.bodyAsString());
-    }
-
-    /**
-     * Cleans up and releases resources used by the HTTP/HTTPS handler.
-     */
-    @Override
-    public void stop() {
-        sinkHandler.stop();
-    }
-}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
new file mode 100644
index 000000000..36d01115b
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handler;
+
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
+import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * AbstractHttpSinkHandler is an abstract class that provides a base 
implementation for HttpSinkHandler.
+ */
+public abstract class AbstractHttpSinkHandler implements HttpSinkHandler {
+
+    private final SinkConnectorConfig sinkConnectorConfig;
+
+    private final List<URI> urls;
+
+    protected AbstractHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) 
{
+        this.sinkConnectorConfig = sinkConnectorConfig;
+        // Initialize URLs
+        String[] urlStrings = sinkConnectorConfig.getUrls();
+        this.urls = Arrays.stream(urlStrings)
+            .map(URI::create)
+            .collect(Collectors.toList());
+    }
+
+    public SinkConnectorConfig getSinkConnectorConfig() {
+        return sinkConnectorConfig;
+    }
+
+    public List<URI> getUrls() {
+        return urls;
+    }
+
+    /**
+     * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method 
should be called for each ConnectRecord that needs to be processed.
+     *
+     * @param record the ConnectRecord to process
+     */
+    @Override
+    public void handle(ConnectRecord record) {
+        // build attributes
+        Map<String, Object> attributes = new ConcurrentHashMap<>();
+        attributes.put(MultiHttpRequestContext.NAME, new 
MultiHttpRequestContext(urls.size()));
+
+        // send the record to all URLs
+        for (URI url : urls) {
+            // convert ConnectRecord to HttpConnectRecord
+            String type = String.format("%s.%s.%s",
+                this.sinkConnectorConfig.getConnectorName(), url.getScheme(),
+                this.sinkConnectorConfig.getWebhookConfig().isActivate() ? 
"webhook" : "common");
+            HttpConnectRecord httpConnectRecord = 
HttpConnectRecord.convertConnectRecord(record, type);
+
+            // add retry event to attributes
+            HttpRetryEvent retryEvent = new HttpRetryEvent();
+            
retryEvent.setMaxRetries(sinkConnectorConfig.getRetryConfig().getMaxRetries());
+            attributes.put(HttpRetryEvent.PREFIX + 
httpConnectRecord.getHttpRecordId(), retryEvent);
+
+            // deliver the record
+            deliver(url, httpConnectRecord, attributes);
+        }
+    }
+
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java
similarity index 83%
rename from 
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java
rename to 
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java
index 09fd66a76..1731809ab 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/HttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/HttpSinkHandler.java
@@ -15,12 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.connector.http.sink.handle;
+package org.apache.eventmesh.connector.http.sink.handler;
 
 import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
 import java.net.URI;
+import java.util.Map;
 
 import io.vertx.core.Future;
 import io.vertx.core.buffer.Buffer;
@@ -32,14 +33,14 @@ import io.vertx.ext.web.client.HttpResponse;
  *
  * <p>Any class that needs to process ConnectRecords via HTTP or HTTPS should 
implement this interface.
  * Implementing classes must provide implementations for the {@link #start()}, 
{@link #handle(ConnectRecord)},
- * {@link #deliver(URI, HttpConnectRecord)}, and {@link #stop()} methods.</p>
+ * {@link #deliver(URI, HttpConnectRecord, Map)}, and {@link #stop()} 
methods.</p>
  *
  * <p>Implementing classes should ensure thread safety and handle HTTP/HTTPS 
communication efficiently.
  * The {@link #start()} method initializes any necessary resources for 
HTTP/HTTPS communication. The {@link #handle(ConnectRecord)} method processes a
- * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, 
HttpConnectRecord)} method processes HttpConnectRecord on specified URL
- * while returning its own processing logic {@link #stop()} method releases 
any resources used for HTTP/HTTPS communication.</p>
+ * ConnectRecord by sending it over HTTP or HTTPS. The {@link #deliver(URI, 
HttpConnectRecord, Map)} method processes HttpConnectRecord on specified
+ * URL while returning its own processing logic {@link #stop()} method 
releases any resources used for HTTP/HTTPS communication.</p>
  *
- * <p>It's recommended to handle exceptions gracefully within the {@link 
#deliver(URI, HttpConnectRecord)} method
+ * <p>It's recommended to handle exceptions gracefully within the {@link 
#deliver(URI, HttpConnectRecord, Map)} method
  * to prevent message loss or processing interruptions.</p>
  */
 public interface HttpSinkHandler {
@@ -62,9 +63,10 @@ public interface HttpSinkHandler {
      *
      * @param url               URI to which the HttpConnectRecord should be 
sent
      * @param httpConnectRecord HttpConnectRecord to process
+     * @param attributes        additional attributes to be used in processing
      * @return processing chain
      */
-    Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord 
httpConnectRecord);
+    Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord 
httpConnectRecord, Map<String, Object> attributes);
 
     /**
      * Cleans up and releases resources used by the HTTP/HTTPS handler. This 
method should be called when the handler is no longer needed.
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
similarity index 57%
rename from 
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
rename to 
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
index 4bc365a13..090784745 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
@@ -15,23 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.connector.http.sink.handle;
+package org.apache.eventmesh.connector.http.sink.handler.impl;
 
 import org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset;
 import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
 import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
+import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
+import 
org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
 import org.apache.eventmesh.connector.http.util.HttpUtils;
 import 
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
 import java.net.URI;
-import java.util.Arrays;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.vertx.core.Future;
@@ -60,22 +60,13 @@ import lombok.extern.slf4j.Slf4j;
  */
 @Slf4j
 @Getter
-public class CommonHttpSinkHandler implements HttpSinkHandler {
-
-    private final SinkConnectorConfig connectorConfig;
-
-    private final List<URI> urls;
+public class CommonHttpSinkHandler extends AbstractHttpSinkHandler {
 
     private WebClient webClient;
 
 
     public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
-        this.connectorConfig = sinkConnectorConfig;
-        // Initialize URLs
-        String[] urlStrings = sinkConnectorConfig.getUrls();
-        this.urls = Arrays.stream(urlStrings)
-            .map(URI::create)
-            .collect(Collectors.toList());
+        super(sinkConnectorConfig);
     }
 
     /**
@@ -91,41 +82,57 @@ public class CommonHttpSinkHandler implements 
HttpSinkHandler {
      * Initializes the WebClient with the provided configuration options.
      */
     private void doInitWebClient() {
+        SinkConnectorConfig sinkConnectorConfig = getSinkConnectorConfig();
         final Vertx vertx = Vertx.vertx();
         WebClientOptions options = new WebClientOptions()
-            .setKeepAlive(this.connectorConfig.isKeepAlive())
-            .setKeepAliveTimeout(this.connectorConfig.getKeepAliveTimeout() / 
1000)
-            .setIdleTimeout(this.connectorConfig.getIdleTimeout())
+            .setKeepAlive(sinkConnectorConfig.isKeepAlive())
+            .setKeepAliveTimeout(sinkConnectorConfig.getKeepAliveTimeout() / 
1000)
+            .setIdleTimeout(sinkConnectorConfig.getIdleTimeout())
             .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)
-            .setConnectTimeout(this.connectorConfig.getConnectionTimeout())
-            .setMaxPoolSize(this.connectorConfig.getMaxConnectionPoolSize());
+            .setConnectTimeout(sinkConnectorConfig.getConnectionTimeout())
+            .setMaxPoolSize(sinkConnectorConfig.getMaxConnectionPoolSize());
         this.webClient = WebClient.create(vertx, options);
     }
 
     /**
-     * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method 
should be called for each ConnectRecord that needs to be processed.
+     * Processes HttpConnectRecord on specified URL while returning its own 
processing logic. This method sends the HttpConnectRecord to the specified
+     * URL using the WebClient.
      *
-     * @param record the ConnectRecord to process
+     * @param url               URI to which the HttpConnectRecord should be 
sent
+     * @param httpConnectRecord HttpConnectRecord to process
+     * @param attributes        additional attributes to be used in processing
+     * @return processing chain
      */
     @Override
-    public void handle(ConnectRecord record) {
-        for (URI url : this.urls) {
-            // convert ConnectRecord to HttpConnectRecord
-            String type = String.format("%s.%s.%s", 
connectorConfig.getConnectorName(), url.getScheme(), "common");
-            HttpConnectRecord httpConnectRecord = 
HttpConnectRecord.convertConnectRecord(record, type);
-            // get timestamp and offset
-            Long timestamp = httpConnectRecord.getData().getTimestamp();
-            Map<String, ?> offset = null;
-            try {
-                // May throw NullPointerException.
-                offset = ((HttpRecordOffset) 
httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
-            } catch (NullPointerException e) {
-                // ignore null pointer exception
-            }
-            final Map<String, ?> finalOffset = offset;
-            Future<HttpResponse<Buffer>> responseFuture = deliver(url, 
httpConnectRecord);
-            responseFuture.onSuccess(res -> {
+    public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord 
httpConnectRecord, Map<String, Object> attributes) {
+        // create headers
+        MultiMap headers = HttpHeaders.headers()
+            .set(HttpHeaderNames.CONTENT_TYPE, "application/json; 
charset=utf-8")
+            .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");
+
+        // get timestamp and offset
+        Long timestamp = httpConnectRecord.getData().getTimestamp();
+        Map<String, ?> offset = null;
+        try {
+            // May throw NullPointerException.
+            offset = ((HttpRecordOffset) 
httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
+        } catch (NullPointerException e) {
+            // ignore null pointer exception
+        }
+        final Map<String, ?> finalOffset = offset;
+
+        // send the request
+        return this.webClient.post(url.getPath())
+            .host(url.getHost())
+            .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), 
"https") ? 443 : 80) : url.getPort())
+            .putHeaders(headers)
+            .ssl(Objects.equals(url.getScheme(), "https"))
+            .sendJson(httpConnectRecord)
+            .onSuccess(res -> {
                 log.info("Request sent successfully. Record: timestamp={}, 
offset={}", timestamp, finalOffset);
+
+                Exception e = null;
+
                 // log the response
                 if (HttpUtils.is2xxSuccessful(res.statusCode())) {
                     if (log.isDebugEnabled()) {
@@ -135,7 +142,6 @@ public class CommonHttpSinkHandler implements 
HttpSinkHandler {
                         log.info("Received successful response: statusCode={}. 
Record: timestamp={}, offset={}", res.statusCode(), timestamp,
                             finalOffset);
                     }
-                    
record.getCallback().onSuccess(convertToSendResult(record));
                 } else {
                     if (log.isDebugEnabled()) {
                         log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}, offset={}, responseBody={}",
@@ -144,14 +150,96 @@ public class CommonHttpSinkHandler implements 
HttpSinkHandler {
                         log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}, offset={}", res.statusCode(), timestamp,
                             finalOffset);
                     }
-                    record.getCallback()
-                        .onException(buildSendExceptionContext(record, new 
RuntimeException("HTTP response code: " + res.statusCode())));
+
+                    e = new RuntimeException("Unexpected HTTP response code: " 
+ res.statusCode());
                 }
+
+                // try callback
+                tryCallback(httpConnectRecord, e, attributes);
             }).onFailure(err -> {
                 log.error("Request failed to send. Record: timestamp={}, 
offset={}", timestamp, finalOffset, err);
-                
record.getCallback().onException(buildSendExceptionContext(record, err));
+
+                // try callback
+                tryCallback(httpConnectRecord, err, attributes);
             });
+    }
+
+    /**
+     * Tries to call the callback based on the result of the request.
+     *
+     * @param httpConnectRecord the HttpConnectRecord to use
+     * @param e                 the exception thrown during the request, may 
be null
+     * @param attributes        additional attributes to be used in processing
+     */
+    private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, 
Map<String, Object> attributes) {
+        // get the retry event
+        HttpRetryEvent retryEvent = getAndUpdateRetryEvent(attributes, 
httpConnectRecord, e);
+
+        // get the multi http request context
+        MultiHttpRequestContext multiHttpRequestContext = 
getAndUpdateMultiHttpRequestContext(attributes, retryEvent);
+
+        if (multiHttpRequestContext.getRemainingRequests() == 0) {
+            // do callback
+            ConnectRecord record = httpConnectRecord.getData();
+            if (record.getCallback() == null) {
+                if (log.isDebugEnabled()) {
+                    log.warn("ConnectRecord callback is null. Ignoring 
callback. {}", record);
+                } else {
+                    log.warn("ConnectRecord callback is null. Ignoring 
callback.");
+                }
+                return;
+            }
+
+            HttpRetryEvent lastFailedEvent = 
multiHttpRequestContext.getLastFailedEvent();
+            if (lastFailedEvent == null) {
+                // success
+                record.getCallback().onSuccess(convertToSendResult(record));
+            } else {
+                // failure
+                
record.getCallback().onException(buildSendExceptionContext(record, 
lastFailedEvent.getLastException()));
+            }
+        }
+    }
+
+    /**
+     * Gets and updates the retry event based on the provided attributes and 
HttpConnectRecord.
+     *
+     * @param attributes        the attributes to use
+     * @param httpConnectRecord the HttpConnectRecord to use
+     * @param e                 the exception thrown during the request, may 
be null
+     * @return the updated retry event
+     */
+    private HttpRetryEvent getAndUpdateRetryEvent(Map<String, Object> 
attributes, HttpConnectRecord httpConnectRecord, Throwable e) {
+        // get the retry event
+        HttpRetryEvent retryEvent = (HttpRetryEvent) 
attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
+        // update the retry event
+        retryEvent.setLastException(e);
+        return retryEvent;
+    }
+
+
+    /**
+     * Gets and updates the multi http request context based on the provided 
attributes and HttpConnectRecord.
+     *
+     * @param attributes the attributes to use
+     * @param retryEvent the retry event to use
+     * @return the updated multi http request context
+     */
+    private MultiHttpRequestContext 
getAndUpdateMultiHttpRequestContext(Map<String, Object> attributes, 
HttpRetryEvent retryEvent) {
+        // get the multi http request context
+        MultiHttpRequestContext multiHttpRequestContext = 
(MultiHttpRequestContext) attributes.get(MultiHttpRequestContext.NAME);
+
+        if (retryEvent.getLastException() == null || 
retryEvent.isMaxRetriesReached()) {
+            // decrement the counter
+            multiHttpRequestContext.decrementRemainingRequests();
+
+            // try set failed event
+            if (retryEvent.getLastException() != null) {
+                multiHttpRequestContext.setLastFailedEvent(retryEvent);
+            }
         }
+
+        return multiHttpRequestContext;
     }
 
     private SendResult convertToSendResult(ConnectRecord record) {
@@ -174,30 +262,6 @@ public class CommonHttpSinkHandler implements 
HttpSinkHandler {
     }
 
 
-    /**
-     * Processes HttpConnectRecord on specified URL while returning its own 
processing logic. This method sends the HttpConnectRecord to the specified
-     * URL using the WebClient.
-     *
-     * @param url               URI to which the HttpConnectRecord should be 
sent
-     * @param httpConnectRecord HttpConnectRecord to process
-     * @return processing chain
-     */
-    @Override
-    public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord 
httpConnectRecord) {
-        // create headers
-        MultiMap headers = HttpHeaders.headers()
-            .set(HttpHeaderNames.CONTENT_TYPE, "application/json; 
charset=utf-8")
-            .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");
-        // send the request
-        return this.webClient.post(url.getPath())
-            .host(url.getHost())
-            .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), 
"https") ? 443 : 80) : url.getPort())
-            .putHeaders(headers)
-            .ssl(Objects.equals(url.getScheme(), "https"))
-            .sendJson(httpConnectRecord);
-    }
-
-
     /**
      * Cleans up and releases resources used by the HTTP/HTTPS handler.
      */
@@ -209,6 +273,4 @@ public class CommonHttpSinkHandler implements 
HttpSinkHandler {
             log.warn("WebClient is null, ignore.");
         }
     }
-
-
 }
\ No newline at end of file
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
new file mode 100644
index 000000000..268d0a0d6
--- /dev/null
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.connector.http.sink.handler.impl;
+
+import org.apache.eventmesh.connector.http.sink.config.HttpRetryConfig;
+import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
+import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
+import 
org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
+import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler;
+import org.apache.eventmesh.connector.http.util.HttpUtils;
+
+import java.net.ConnectException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Map;
+
+import io.vertx.core.Future;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+
+import lombok.extern.slf4j.Slf4j;
+
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
+
+
+/**
+ * HttpSinkHandlerRetryWrapper is a wrapper class for the HttpSinkHandler that 
provides retry functionality for failed HTTP requests.
+ */
+@Slf4j
+public class HttpSinkHandlerRetryWrapper extends AbstractHttpSinkHandler {
+
+    private final HttpRetryConfig httpRetryConfig;
+
+    private final HttpSinkHandler sinkHandler;
+
+    public HttpSinkHandlerRetryWrapper(SinkConnectorConfig 
sinkConnectorConfig, HttpSinkHandler sinkHandler) {
+        super(sinkConnectorConfig);
+        this.sinkHandler = sinkHandler;
+        this.httpRetryConfig = getSinkConnectorConfig().getRetryConfig();
+    }
+
+    /**
+     * Initializes the WebClient for making HTTP requests based on the 
provided SinkConnectorConfig.
+     */
+    @Override
+    public void start() {
+        sinkHandler.start();
+    }
+
+
+    /**
+     * Processes HttpConnectRecord on specified URL while returning its own 
processing logic This method provides the retry power to process the
+     * HttpConnectRecord
+     *
+     * @param url               URI to which the HttpConnectRecord should be 
sent
+     * @param httpConnectRecord HttpConnectRecord to process
+     * @param attributes        additional attributes to pass to the 
processing chain
+     * @return processing chain
+     */
+    @Override
+    public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord 
httpConnectRecord, Map<String, Object> attributes) {
+
+        // Build the retry policy
+        RetryPolicy<HttpResponse<Buffer>> retryPolicy = 
RetryPolicy.<HttpResponse<Buffer>>builder()
+            .handleIf(e -> e instanceof ConnectException)
+            .handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() 
&& !HttpUtils.is2xxSuccessful(response.statusCode()))
+            .withMaxRetries(httpRetryConfig.getMaxRetries())
+            .withDelay(Duration.ofMillis(httpRetryConfig.getInterval()))
+            .onRetry(event -> {
+                if (log.isDebugEnabled()) {
+                    log.warn("Retrying the request to {} for the {} time. {}", 
url, event.getAttemptCount(), httpConnectRecord);
+                } else {
+                    log.warn("Retrying the request to {} for the {} time.", 
url, event.getAttemptCount());
+                }
+                // update the retry event
+                HttpRetryEvent retryEvent = (HttpRetryEvent) 
attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
+                retryEvent.increaseCurrentRetries();
+            })
+            .onFailure(event -> {
+                if (log.isDebugEnabled()) {
+                    log.error("Failed to send the request to {} after {} 
attempts. {}", url, event.getAttemptCount(),
+                        httpConnectRecord, event.getException());
+                } else {
+                    log.error("Failed to send the request to {} after {} 
attempts.", url, event.getAttemptCount(), event.getException());
+                }
+            }).build();
+
+        // Handle the ConnectRecord with retry policy
+        Failsafe.with(retryPolicy)
+            .getStageAsync(() -> sinkHandler.deliver(url, httpConnectRecord, 
attributes).toCompletionStage());
+
+        return null;
+    }
+
+
+    /**
+     * Cleans up and releases resources used by the HTTP/HTTPS handler.
+     */
+    @Override
+    public void stop() {
+        sinkHandler.stop();
+    }
+}
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
similarity index 82%
rename from 
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
rename to 
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
index 4e64126a9..ff8f69d45 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.eventmesh.connector.http.sink.handle;
+package org.apache.eventmesh.connector.http.sink.handler.impl;
 
 import org.apache.eventmesh.common.exception.EventMeshException;
 import 
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
@@ -25,13 +25,14 @@ import 
org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
 import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata;
 import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord;
 import org.apache.eventmesh.connector.http.sink.data.HttpExportRecordPage;
-import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
 
 import org.apache.commons.lang3.StringUtils;
 
 import java.net.URI;
 import java.time.LocalDateTime;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
@@ -61,8 +62,6 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class WebhookHttpSinkHandler extends CommonHttpSinkHandler {
 
-    private final SinkConnectorConfig sinkConnectorConfig;
-
     // the configuration for webhook
     private final HttpWebhookConfig webhookConfig;
 
@@ -86,7 +85,7 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
 
     public WebhookHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) {
         super(sinkConnectorConfig);
-        this.sinkConnectorConfig = sinkConnectorConfig;
+
         this.webhookConfig = sinkConnectorConfig.getWebhookConfig();
         int maxQueueSize = this.webhookConfig.getMaxStorageSize();
         this.receivedDataQueue = new 
SynchronizedCircularFifoQueue<>(maxQueueSize);
@@ -94,9 +93,6 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
         doInitExportServer();
     }
 
-    public SynchronizedCircularFifoQueue<HttpExportRecord> 
getReceivedDataQueue() {
-        return receivedDataQueue;
-    }
 
     /**
      * Initialize the server for exporting the received data
@@ -202,22 +198,6 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
         });
     }
 
-    /**
-     * Processes a ConnectRecord by sending it over HTTP or HTTPS. This method 
should be called for each ConnectRecord that needs to be processed.
-     *
-     * @param record the ConnectRecord to process
-     */
-    @Override
-    public void handle(ConnectRecord record) {
-        for (URI url : super.getUrls()) {
-            // convert ConnectRecord to HttpConnectRecord
-            String type = String.format("%s.%s.%s", 
this.getConnectorConfig().getConnectorName(), url.getScheme(), "webhook");
-            HttpConnectRecord httpConnectRecord = 
HttpConnectRecord.convertConnectRecord(record, type);
-            // handle the HttpConnectRecord
-            deliver(url, httpConnectRecord);
-        }
-    }
-
 
     /**
      * Processes HttpConnectRecord on specified URL while returning its own 
processing logic This method sends the HttpConnectRecord to the specified
@@ -225,30 +205,27 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
      *
      * @param url               URI to which the HttpConnectRecord should be 
sent
      * @param httpConnectRecord HttpConnectRecord to process
+     * @param attributes        additional attributes to be used in processing
      * @return processing chain
      */
     @Override
-    public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord 
httpConnectRecord) {
+    public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord 
httpConnectRecord, Map<String, Object> attributes) {
         // send the request
-        Future<HttpResponse<Buffer>> responseFuture = super.deliver(url, 
httpConnectRecord);
+        Future<HttpResponse<Buffer>> responseFuture = super.deliver(url, 
httpConnectRecord, attributes);
         // store the received data
         return responseFuture.onComplete(arr -> {
-            // If open retry, return directly and handled by 
RetryHttpSinkHandler
-            if (sinkConnectorConfig.getRetryConfig().getMaxRetries() > 0) {
-                return;
+            // get tryEvent from attributes
+            HttpRetryEvent retryEvent = (HttpRetryEvent) 
attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
+
+            HttpResponse<Buffer> response = null;
+            if (arr.succeeded()) {
+                response = arr.result();
+            } else {
+                retryEvent.setLastException(arr.cause());
             }
-            // create ExportMetadataBuilder
-            HttpResponse<Buffer> response = arr.succeeded() ? arr.result() : 
null;
-
-            HttpExportMetadata httpExportMetadata = 
HttpExportMetadata.builder()
-                .url(url.toString())
-                .code(response != null ? response.statusCode() : -1)
-                .message(response != null ? response.statusMessage() : 
arr.cause().getMessage())
-                .receivedTime(LocalDateTime.now())
-                .retriedBy(null)
-                .uuid(httpConnectRecord.getUuid())
-                .retryNum(0)
-                .build();
+
+            // create ExportMetadata
+            HttpExportMetadata httpExportMetadata = 
buildHttpExportMetadata(url, response, httpConnectRecord, retryEvent);
 
             // create ExportRecord
             HttpExportRecord exportRecord = new 
HttpExportRecord(httpExportMetadata, arr.succeeded() ? 
arr.result().bodyAsString() : null);
@@ -257,6 +234,38 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
         });
     }
 
+    /**
+     * Builds the HttpExportMetadata object based on the response, 
HttpConnectRecord, and HttpRetryEvent.
+     *
+     * @param url               the URI to which the HttpConnectRecord was sent
+     * @param response          the response received from the URI
+     * @param httpConnectRecord the HttpConnectRecord that was sent
+     * @param retryEvent        the SingleHttpRetryEvent that was used for 
retries
+     * @return the HttpExportMetadata object
+     */
+    private HttpExportMetadata buildHttpExportMetadata(URI url, 
HttpResponse<Buffer> response, HttpConnectRecord httpConnectRecord,
+        HttpRetryEvent retryEvent) {
+
+        String msg = null;
+        // order of precedence: lastException > response > null
+        if (retryEvent.getLastException() != null) {
+            msg = retryEvent.getLimitedExceptionMessage();
+            retryEvent.setLastException(null);
+        } else if (response != null) {
+            msg = response.statusMessage();
+        }
+
+        return HttpExportMetadata.builder()
+            .url(url.toString())
+            .code(response != null ? response.statusCode() : -1)
+            .message(msg)
+            .receivedTime(LocalDateTime.now())
+            .httpRecordId(httpConnectRecord.getHttpRecordId())
+            .recordId(httpConnectRecord.getData().getRecordId())
+            .retryNum(retryEvent.getCurrentRetries())
+            .build();
+    }
+
 
     /**
      * Cleans up and releases resources used by the HTTP/HTTPS handler.
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
index 3e724627c..7ddba511c 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
@@ -86,7 +86,7 @@ public class HttpSinkConnectorTest {
                     JSONObject requestBody = 
JSON.parseObject(httpRequest.getBodyAsString());
                     return HttpResponse.response()
                         .withContentType(MediaType.APPLICATION_JSON)
-                        .withStatusCode(200)
+                        .withStatusCode(HttpStatus.SC_OK)
                         .withBody(new JSONObject()
                             .fluentPut("code", 0)
                             .fluentPut("message", "success")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to