This is an automated email from the ASF dual-hosted git repository.

xiatian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new fe3d56b11 [ISSUE #5105] Fix the retry mechanism of the 
HttpSinkConnector (#5106)
fe3d56b11 is described below

commit fe3d56b113e1b41c108731d0925746ae4edb0b91
Author: Zaki <[email protected]>
AuthorDate: Mon Oct 28 11:37:19 2024 +0800

    [ISSUE #5105] Fix the retry mechanism of the HttpSinkConnector (#5106)
---
 .../{HttpRetryEvent.java => HttpAttemptEvent.java} | 74 +++++++++++++++++-----
 .../http/sink/data/MultiHttpRequestContext.java    | 16 +++--
 .../http/sink/handler/AbstractHttpSinkHandler.java |  9 ++-
 .../sink/handler/impl/CommonHttpSinkHandler.java   | 47 +++++---------
 .../handler/impl/HttpSinkHandlerRetryWrapper.java  | 57 ++++++++---------
 .../sink/handler/impl/WebhookHttpSinkHandler.java  | 27 ++++----
 .../connector/http/sink/HttpSinkConnectorTest.java |  2 -
 7 files changed, 130 insertions(+), 102 deletions(-)

diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpAttemptEvent.java
similarity index 52%
rename from 
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java
rename to 
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpAttemptEvent.java
index 4b229f983..8163852f8 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpAttemptEvent.java
@@ -17,41 +17,82 @@
 
 package org.apache.eventmesh.connector.http.sink.data;
 
-import lombok.Data;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * Single HTTP retry event
+ * Single HTTP attempt event
  */
-@Data
-public class HttpRetryEvent {
+public class HttpAttemptEvent {
 
-    public static final String PREFIX = "http-retry-event-";
+    public static final String PREFIX = "http-attempt-event-";
 
-    private String parentId;
+    private final int maxAttempts;
 
-    private int maxRetries;
-
-    private int currentRetries;
+    private final AtomicInteger attempts;
 
     private Throwable lastException;
 
+
+    public HttpAttemptEvent(int maxAttempts) {
+        this.maxAttempts = maxAttempts;
+        this.attempts = new AtomicInteger(0);
+    }
+
+    /**
+     * Increment the attempts
+     */
+    public void incrementAttempts() {
+        attempts.incrementAndGet();
+    }
+
     /**
-     * Increase the current retries by 1
+     * Update the event, incrementing the attempts and setting the last 
exception
+     *
+     * @param exception the exception to update, can be null
      */
-    public void increaseCurrentRetries() {
-        this.currentRetries++;
+    public void updateEvent(Throwable exception) {
+        // increment the attempts
+        incrementAttempts();
+
+        // update the last exception
+        lastException = exception;
     }
 
     /**
-     * Check if the current retries is greater than or equal to the max retries
-     * @return true if the current retries is greater than or equal to the max 
retries
+     * Check if the attempts are less than the maximum attempts
+     *
+     * @return true if the attempts are less than the maximum attempts, false 
otherwise
      */
-    public boolean isMaxRetriesReached() {
-        return this.currentRetries >= this.maxRetries;
+    public boolean canAttempt() {
+        return attempts.get() < maxAttempts;
+    }
+
+    public boolean isComplete() {
+        if (attempts.get() == 0) {
+            // No start yet
+            return false;
+        }
+
+        // If no attempt can be made or the last exception is null, the event 
completed
+        return !canAttempt() || lastException == null;
+    }
+
+
+    public int getMaxAttempts() {
+        return maxAttempts;
+    }
+
+    public int getAttempts() {
+        return attempts.get();
+    }
+
+    public Throwable getLastException() {
+        return lastException;
     }
 
     /**
      * Get the limited exception message with the default limit of 256
+     *
      * @return the limited exception message
      */
     public String getLimitedExceptionMessage() {
@@ -60,6 +101,7 @@ public class HttpRetryEvent {
 
     /**
      * Get the limited exception message with the specified limit
+     *
      * @param maxLimit the maximum limit of the exception message
      * @return the limited exception message
      */
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java
index 67ab94381..66f5d0e7e 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java
@@ -34,10 +34,9 @@ public class MultiHttpRequestContext {
 
     /**
      * The last failed event.
-     * If there are no retries or retries are not enabled, it will be null.
      * If retries occur but still fail, it will be logged, and only the last 
one will be retained.
      */
-    private HttpRetryEvent lastFailedEvent;
+    private HttpAttemptEvent lastFailedEvent;
 
     public MultiHttpRequestContext(int remainingEvents) {
         this.remainingRequests = new AtomicInteger(remainingEvents);
@@ -50,15 +49,24 @@ public class MultiHttpRequestContext {
         remainingRequests.decrementAndGet();
     }
 
+    /**
+     * Check if all requests have been processed.
+     *
+     * @return true if all requests have been processed, false otherwise.
+     */
+    public boolean isAllRequestsProcessed() {
+        return remainingRequests.get() == 0;
+    }
+
     public int getRemainingRequests() {
         return remainingRequests.get();
     }
 
-    public HttpRetryEvent getLastFailedEvent() {
+    public HttpAttemptEvent getLastFailedEvent() {
         return lastFailedEvent;
     }
 
-    public void setLastFailedEvent(HttpRetryEvent lastFailedEvent) {
+    public void setLastFailedEvent(HttpAttemptEvent lastFailedEvent) {
         this.lastFailedEvent = lastFailedEvent;
     }
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
index 5c868f4aa..28ba79112 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
@@ -18,8 +18,8 @@
 package org.apache.eventmesh.connector.http.sink.handler;
 
 import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpAttemptEvent;
 import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
-import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
 import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
@@ -75,10 +75,9 @@ public abstract class AbstractHttpSinkHandler implements 
HttpSinkHandler {
                 this.sinkConnectorConfig.getWebhookConfig().isActivate() ? 
"webhook" : "common");
             HttpConnectRecord httpConnectRecord = 
HttpConnectRecord.convertConnectRecord(record, type);
 
-            // add retry event to attributes
-            HttpRetryEvent retryEvent = new HttpRetryEvent();
-            
retryEvent.setMaxRetries(sinkConnectorConfig.getRetryConfig().getMaxRetries());
-            attributes.put(HttpRetryEvent.PREFIX + 
httpConnectRecord.getHttpRecordId(), retryEvent);
+            // add AttemptEvent to the attributes
+            HttpAttemptEvent attemptEvent = new 
HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
+            attributes.put(HttpAttemptEvent.PREFIX + 
httpConnectRecord.getHttpRecordId(), attemptEvent);
 
             // deliver the record
             deliver(url, httpConnectRecord, attributes, record);
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
index e88707482..61bdc9f31 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
@@ -19,8 +19,8 @@ package org.apache.eventmesh.connector.http.sink.handler.impl;
 
 import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
 import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.connector.http.sink.data.HttpAttemptEvent;
 import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
-import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
 import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
 import 
org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
 import org.apache.eventmesh.connector.http.util.HttpUtils;
@@ -176,13 +176,14 @@ public class CommonHttpSinkHandler extends 
AbstractHttpSinkHandler {
      * @param attributes        additional attributes to be used in processing
      */
     private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, 
Map<String, Object> attributes, ConnectRecord record) {
-        // get the retry event
-        HttpRetryEvent retryEvent = getAndUpdateRetryEvent(attributes, 
httpConnectRecord, e);
+        // get and update the attempt event
+        HttpAttemptEvent attemptEvent = (HttpAttemptEvent) 
attributes.get(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId());
+        attemptEvent.updateEvent(e);
 
-        // get the multi http request context
-        MultiHttpRequestContext multiHttpRequestContext = 
getAndUpdateMultiHttpRequestContext(attributes, retryEvent);
+        // get and update the multiHttpRequestContext
+        MultiHttpRequestContext multiHttpRequestContext = 
getAndUpdateMultiHttpRequestContext(attributes, attemptEvent);
 
-        if (multiHttpRequestContext.getRemainingRequests() == 0) {
+        if (multiHttpRequestContext.isAllRequestsProcessed()) {
             // do callback
             if (record.getCallback() == null) {
                 if (log.isDebugEnabled()) {
@@ -193,7 +194,8 @@ public class CommonHttpSinkHandler extends 
AbstractHttpSinkHandler {
                 return;
             }
 
-            HttpRetryEvent lastFailedEvent = 
multiHttpRequestContext.getLastFailedEvent();
+            // get the last failed event
+            HttpAttemptEvent lastFailedEvent = 
multiHttpRequestContext.getLastFailedEvent();
             if (lastFailedEvent == null) {
                 // success
                 record.getCallback().onSuccess(convertToSendResult(record));
@@ -204,41 +206,26 @@ public class CommonHttpSinkHandler extends 
AbstractHttpSinkHandler {
         }
     }
 
-    /**
-     * Gets and updates the retry event based on the provided attributes and 
HttpConnectRecord.
-     *
-     * @param attributes        the attributes to use
-     * @param httpConnectRecord the HttpConnectRecord to use
-     * @param e                 the exception thrown during the request, may 
be null
-     * @return the updated retry event
-     */
-    private HttpRetryEvent getAndUpdateRetryEvent(Map<String, Object> 
attributes, HttpConnectRecord httpConnectRecord, Throwable e) {
-        // get the retry event
-        HttpRetryEvent retryEvent = (HttpRetryEvent) 
attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
-        // update the retry event
-        retryEvent.setLastException(e);
-        return retryEvent;
-    }
-
 
     /**
      * Gets and updates the multi http request context based on the provided 
attributes and HttpConnectRecord.
      *
-     * @param attributes the attributes to use
-     * @param retryEvent the retry event to use
+     * @param attributes   the attributes to use
+     * @param attemptEvent the HttpAttemptEvent to use
      * @return the updated multi http request context
      */
-    private MultiHttpRequestContext 
getAndUpdateMultiHttpRequestContext(Map<String, Object> attributes, 
HttpRetryEvent retryEvent) {
+    private MultiHttpRequestContext 
getAndUpdateMultiHttpRequestContext(Map<String, Object> attributes, 
HttpAttemptEvent attemptEvent) {
         // get the multi http request context
         MultiHttpRequestContext multiHttpRequestContext = 
(MultiHttpRequestContext) attributes.get(MultiHttpRequestContext.NAME);
 
-        if (retryEvent.getLastException() == null || 
retryEvent.isMaxRetriesReached()) {
+        // Check if the current attempted event has completed
+        if (attemptEvent.isComplete()) {
             // decrement the counter
             multiHttpRequestContext.decrementRemainingRequests();
 
-            // try set failed event
-            if (retryEvent.getLastException() != null) {
-                multiHttpRequestContext.setLastFailedEvent(retryEvent);
+            if (attemptEvent.getLastException() != null) {
+                // if all attempts are exhausted, set the last failed event
+                multiHttpRequestContext.setLastFailedEvent(attemptEvent);
             }
         }
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
index 820b46296..050839451 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
@@ -20,7 +20,6 @@ package org.apache.eventmesh.connector.http.sink.handler.impl;
 import org.apache.eventmesh.common.config.connector.http.HttpRetryConfig;
 import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
 import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
-import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
 import 
org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
 import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler;
 import org.apache.eventmesh.connector.http.util.HttpUtils;
@@ -51,10 +50,38 @@ public class HttpSinkHandlerRetryWrapper extends 
AbstractHttpSinkHandler {
 
     private final HttpSinkHandler sinkHandler;
 
+    private final RetryPolicy<HttpResponse<Buffer>> retryPolicy;
+
     public HttpSinkHandlerRetryWrapper(SinkConnectorConfig 
sinkConnectorConfig, HttpSinkHandler sinkHandler) {
         super(sinkConnectorConfig);
         this.sinkHandler = sinkHandler;
         this.httpRetryConfig = getSinkConnectorConfig().getRetryConfig();
+        this.retryPolicy = buildRetryPolicy();
+    }
+
+    private RetryPolicy<HttpResponse<Buffer>> buildRetryPolicy() {
+        return RetryPolicy.<HttpResponse<Buffer>>builder()
+            .handleIf(e -> e instanceof ConnectException)
+            .handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() 
&& !HttpUtils.is2xxSuccessful(response.statusCode()))
+            .withMaxRetries(httpRetryConfig.getMaxRetries())
+            .withDelay(Duration.ofMillis(httpRetryConfig.getInterval()))
+            .onRetry(event -> {
+                if (log.isDebugEnabled()) {
+                    log.warn("Failed to deliver message after {} attempts. 
Retrying in {} ms. Error: {}",
+                        event.getAttemptCount(), 
httpRetryConfig.getInterval(), event.getLastException());
+                } else {
+                    log.warn("Failed to deliver message after {} attempts. 
Retrying in {} ms.",
+                        event.getAttemptCount(), 
httpRetryConfig.getInterval());
+                }
+            }).onFailure(event -> {
+                if (log.isDebugEnabled()) {
+                    log.error("Failed to deliver message after {} attempts. 
Error: {}",
+                        event.getAttemptCount(), event.getException());
+                } else {
+                    log.error("Failed to deliver message after {} attempts.",
+                        event.getAttemptCount());
+                }
+            }).build();
     }
 
     /**
@@ -78,36 +105,8 @@ public class HttpSinkHandlerRetryWrapper extends 
AbstractHttpSinkHandler {
     @Override
     public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord 
httpConnectRecord, Map<String, Object> attributes,
         ConnectRecord connectRecord) {
-
-        // Build the retry policy
-        RetryPolicy<HttpResponse<Buffer>> retryPolicy = 
RetryPolicy.<HttpResponse<Buffer>>builder()
-            .handleIf(e -> e instanceof ConnectException)
-            .handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() 
&& !HttpUtils.is2xxSuccessful(response.statusCode()))
-            .withMaxRetries(httpRetryConfig.getMaxRetries())
-            .withDelay(Duration.ofMillis(httpRetryConfig.getInterval()))
-            .onRetry(event -> {
-                if (log.isDebugEnabled()) {
-                    log.warn("Retrying the request to {} for the {} time. {}", 
url, event.getAttemptCount(), httpConnectRecord);
-                } else {
-                    log.warn("Retrying the request to {} for the {} time.", 
url, event.getAttemptCount());
-                }
-                // update the retry event
-                HttpRetryEvent retryEvent = (HttpRetryEvent) 
attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
-                retryEvent.increaseCurrentRetries();
-            })
-            .onFailure(event -> {
-                if (log.isDebugEnabled()) {
-                    log.error("Failed to send the request to {} after {} 
attempts. {}", url, event.getAttemptCount(),
-                        httpConnectRecord, event.getException());
-                } else {
-                    log.error("Failed to send the request to {} after {} 
attempts.", url, event.getAttemptCount(), event.getException());
-                }
-            }).build();
-
-        // Handle the ConnectRecord with retry policy
         Failsafe.with(retryPolicy)
             .getStageAsync(() -> sinkHandler.deliver(url, httpConnectRecord, 
attributes, connectRecord).toCompletionStage());
-
         return null;
     }
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
index 7edd84a96..0751918ee 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
@@ -21,11 +21,11 @@ import 
org.apache.eventmesh.common.config.connector.http.HttpWebhookConfig;
 import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
 import org.apache.eventmesh.common.exception.EventMeshException;
 import 
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
+import org.apache.eventmesh.connector.http.sink.data.HttpAttemptEvent;
 import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
 import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata;
 import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord;
 import org.apache.eventmesh.connector.http.sink.data.HttpExportRecordPage;
-import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
 import org.apache.commons.lang3.StringUtils;
@@ -216,18 +216,14 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
         Future<HttpResponse<Buffer>> responseFuture = super.deliver(url, 
httpConnectRecord, attributes, connectRecord);
         // store the received data
         return responseFuture.onComplete(arr -> {
-            // get tryEvent from attributes
-            HttpRetryEvent retryEvent = (HttpRetryEvent) 
attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
+            // get HttpAttemptEvent
+            HttpAttemptEvent attemptEvent = (HttpAttemptEvent) 
attributes.get(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId());
 
-            HttpResponse<Buffer> response = null;
-            if (arr.succeeded()) {
-                response = arr.result();
-            } else {
-                retryEvent.setLastException(arr.cause());
-            }
+            // get the response
+            HttpResponse<Buffer> response = arr.succeeded() ? arr.result() : 
null;
 
             // create ExportMetadata
-            HttpExportMetadata httpExportMetadata = 
buildHttpExportMetadata(url, response, httpConnectRecord, retryEvent);
+            HttpExportMetadata httpExportMetadata = 
buildHttpExportMetadata(url, response, httpConnectRecord, attemptEvent);
 
             // create ExportRecord
             HttpExportRecord exportRecord = new 
HttpExportRecord(httpExportMetadata, arr.succeeded() ? 
arr.result().bodyAsString() : null);
@@ -242,17 +238,16 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
      * @param url               the URI to which the HttpConnectRecord was sent
      * @param response          the response received from the URI
      * @param httpConnectRecord the HttpConnectRecord that was sent
-     * @param retryEvent        the SingleHttpRetryEvent that was used for 
retries
+     * @param attemptEvent      the HttpAttemptEvent that was used to send the 
HttpConnectRecord
      * @return the HttpExportMetadata object
      */
     private HttpExportMetadata buildHttpExportMetadata(URI url, 
HttpResponse<Buffer> response, HttpConnectRecord httpConnectRecord,
-        HttpRetryEvent retryEvent) {
+        HttpAttemptEvent attemptEvent) {
 
         String msg = null;
         // order of precedence: lastException > response > null
-        if (retryEvent.getLastException() != null) {
-            msg = retryEvent.getLimitedExceptionMessage();
-            retryEvent.setLastException(null);
+        if (attemptEvent.getLastException() != null) {
+            msg = attemptEvent.getLimitedExceptionMessage();
         } else if (response != null) {
             msg = response.statusMessage();
         }
@@ -263,7 +258,7 @@ public class WebhookHttpSinkHandler extends 
CommonHttpSinkHandler {
             .message(msg)
             .receivedTime(LocalDateTime.now())
             .recordId(httpConnectRecord.getHttpRecordId())
-            .retryNum(retryEvent.getCurrentRetries())
+            .retryNum(attemptEvent.getAttempts() - 1)
             .build();
     }
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
index 5f65f0749..be2b52e73 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
@@ -83,14 +83,12 @@ public class HttpSinkConnectorTest {
                 httpRequest -> {
                     // Increase the number of requests received
                     counter.incrementAndGet();
-                    JSONObject requestBody = 
JSON.parseObject(httpRequest.getBodyAsString());
                     return HttpResponse.response()
                         .withContentType(MediaType.APPLICATION_JSON)
                         .withStatusCode(HttpStatus.SC_OK)
                         .withBody(new JSONObject()
                             .fluentPut("code", 0)
                             .fluentPut("message", "success")
-                            .fluentPut("data", 
requestBody.getJSONObject("data").get("data"))
                             .toJSONString()
                         ); // .withDelay(TimeUnit.SECONDS, 10);
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to