This is an automated email from the ASF dual-hosted git repository.
xiatian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new fe3d56b11 [ISSUE #5105] Fix the retry mechanism of the
HttpSinkConnector (#5106)
fe3d56b11 is described below
commit fe3d56b113e1b41c108731d0925746ae4edb0b91
Author: Zaki <[email protected]>
AuthorDate: Mon Oct 28 11:37:19 2024 +0800
[ISSUE #5105] Fix the retry mechanism of the HttpSinkConnector (#5106)
---
.../{HttpRetryEvent.java => HttpAttemptEvent.java} | 74 +++++++++++++++++-----
.../http/sink/data/MultiHttpRequestContext.java | 16 +++--
.../http/sink/handler/AbstractHttpSinkHandler.java | 9 ++-
.../sink/handler/impl/CommonHttpSinkHandler.java | 47 +++++---------
.../handler/impl/HttpSinkHandlerRetryWrapper.java | 57 ++++++++---------
.../sink/handler/impl/WebhookHttpSinkHandler.java | 27 ++++----
.../connector/http/sink/HttpSinkConnectorTest.java | 2 -
7 files changed, 130 insertions(+), 102 deletions(-)
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpAttemptEvent.java
similarity index 52%
rename from
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java
rename to
eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpAttemptEvent.java
index 4b229f983..8163852f8 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpAttemptEvent.java
@@ -17,41 +17,82 @@
package org.apache.eventmesh.connector.http.sink.data;
-import lombok.Data;
+import java.util.concurrent.atomic.AtomicInteger;
/**
- * Single HTTP retry event
+ * Single HTTP attempt event
*/
-@Data
-public class HttpRetryEvent {
+public class HttpAttemptEvent {
- public static final String PREFIX = "http-retry-event-";
+ public static final String PREFIX = "http-attempt-event-";
- private String parentId;
+ private final int maxAttempts;
- private int maxRetries;
-
- private int currentRetries;
+ private final AtomicInteger attempts;
private Throwable lastException;
+
+ public HttpAttemptEvent(int maxAttempts) {
+ this.maxAttempts = maxAttempts;
+ this.attempts = new AtomicInteger(0);
+ }
+
+ /**
+ * Increment the attempts
+ */
+ public void incrementAttempts() {
+ attempts.incrementAndGet();
+ }
+
/**
- * Increase the current retries by 1
+ * Update the event, incrementing the attempts and setting the last
exception
+ *
+ * @param exception the exception to update, can be null
*/
- public void increaseCurrentRetries() {
- this.currentRetries++;
+ public void updateEvent(Throwable exception) {
+ // increment the attempts
+ incrementAttempts();
+
+ // update the last exception
+ lastException = exception;
}
/**
- * Check if the current retries is greater than or equal to the max retries
- * @return true if the current retries is greater than or equal to the max
retries
+ * Check if the attempts are less than the maximum attempts
+ *
+ * @return true if the attempts are less than the maximum attempts, false
otherwise
*/
- public boolean isMaxRetriesReached() {
- return this.currentRetries >= this.maxRetries;
+ public boolean canAttempt() {
+ return attempts.get() < maxAttempts;
+ }
+
+ public boolean isComplete() {
+ if (attempts.get() == 0) {
+ // No start yet
+ return false;
+ }
+
+ // If no attempt can be made or the last exception is null, the event
completed
+ return !canAttempt() || lastException == null;
+ }
+
+
+ public int getMaxAttempts() {
+ return maxAttempts;
+ }
+
+ public int getAttempts() {
+ return attempts.get();
+ }
+
+ public Throwable getLastException() {
+ return lastException;
}
/**
* Get the limited exception message with the default limit of 256
+ *
* @return the limited exception message
*/
public String getLimitedExceptionMessage() {
@@ -60,6 +101,7 @@ public class HttpRetryEvent {
/**
* Get the limited exception message with the specified limit
+ *
* @param maxLimit the maximum limit of the exception message
* @return the limited exception message
*/
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java
index 67ab94381..66f5d0e7e 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java
@@ -34,10 +34,9 @@ public class MultiHttpRequestContext {
/**
* The last failed event.
- * If there are no retries or retries are not enabled, it will be null.
* If retries occur but still fail, it will be logged, and only the last
one will be retained.
*/
- private HttpRetryEvent lastFailedEvent;
+ private HttpAttemptEvent lastFailedEvent;
public MultiHttpRequestContext(int remainingEvents) {
this.remainingRequests = new AtomicInteger(remainingEvents);
@@ -50,15 +49,24 @@ public class MultiHttpRequestContext {
remainingRequests.decrementAndGet();
}
+ /**
+ * Check if all requests have been processed.
+ *
+ * @return true if all requests have been processed, false otherwise.
+ */
+ public boolean isAllRequestsProcessed() {
+ return remainingRequests.get() == 0;
+ }
+
public int getRemainingRequests() {
return remainingRequests.get();
}
- public HttpRetryEvent getLastFailedEvent() {
+ public HttpAttemptEvent getLastFailedEvent() {
return lastFailedEvent;
}
- public void setLastFailedEvent(HttpRetryEvent lastFailedEvent) {
+ public void setLastFailedEvent(HttpAttemptEvent lastFailedEvent) {
this.lastFailedEvent = lastFailedEvent;
}
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
index 5c868f4aa..28ba79112 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java
@@ -18,8 +18,8 @@
package org.apache.eventmesh.connector.http.sink.handler;
import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
+import org.apache.eventmesh.connector.http.sink.data.HttpAttemptEvent;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
-import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
@@ -75,10 +75,9 @@ public abstract class AbstractHttpSinkHandler implements
HttpSinkHandler {
this.sinkConnectorConfig.getWebhookConfig().isActivate() ?
"webhook" : "common");
HttpConnectRecord httpConnectRecord =
HttpConnectRecord.convertConnectRecord(record, type);
- // add retry event to attributes
- HttpRetryEvent retryEvent = new HttpRetryEvent();
-
retryEvent.setMaxRetries(sinkConnectorConfig.getRetryConfig().getMaxRetries());
- attributes.put(HttpRetryEvent.PREFIX +
httpConnectRecord.getHttpRecordId(), retryEvent);
+ // add AttemptEvent to the attributes
+ HttpAttemptEvent attemptEvent = new
HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
+ attributes.put(HttpAttemptEvent.PREFIX +
httpConnectRecord.getHttpRecordId(), attemptEvent);
// deliver the record
deliver(url, httpConnectRecord, attributes, record);
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
index e88707482..61bdc9f31 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java
@@ -19,8 +19,8 @@ package org.apache.eventmesh.connector.http.sink.handler.impl;
import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.connector.http.sink.data.HttpAttemptEvent;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
-import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
import
org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
import org.apache.eventmesh.connector.http.util.HttpUtils;
@@ -176,13 +176,14 @@ public class CommonHttpSinkHandler extends
AbstractHttpSinkHandler {
* @param attributes additional attributes to be used in processing
*/
private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e,
Map<String, Object> attributes, ConnectRecord record) {
- // get the retry event
- HttpRetryEvent retryEvent = getAndUpdateRetryEvent(attributes,
httpConnectRecord, e);
+ // get and update the attempt event
+ HttpAttemptEvent attemptEvent = (HttpAttemptEvent)
attributes.get(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId());
+ attemptEvent.updateEvent(e);
- // get the multi http request context
- MultiHttpRequestContext multiHttpRequestContext =
getAndUpdateMultiHttpRequestContext(attributes, retryEvent);
+ // get and update the multiHttpRequestContext
+ MultiHttpRequestContext multiHttpRequestContext =
getAndUpdateMultiHttpRequestContext(attributes, attemptEvent);
- if (multiHttpRequestContext.getRemainingRequests() == 0) {
+ if (multiHttpRequestContext.isAllRequestsProcessed()) {
// do callback
if (record.getCallback() == null) {
if (log.isDebugEnabled()) {
@@ -193,7 +194,8 @@ public class CommonHttpSinkHandler extends
AbstractHttpSinkHandler {
return;
}
- HttpRetryEvent lastFailedEvent =
multiHttpRequestContext.getLastFailedEvent();
+ // get the last failed event
+ HttpAttemptEvent lastFailedEvent =
multiHttpRequestContext.getLastFailedEvent();
if (lastFailedEvent == null) {
// success
record.getCallback().onSuccess(convertToSendResult(record));
@@ -204,41 +206,26 @@ public class CommonHttpSinkHandler extends
AbstractHttpSinkHandler {
}
}
- /**
- * Gets and updates the retry event based on the provided attributes and
HttpConnectRecord.
- *
- * @param attributes the attributes to use
- * @param httpConnectRecord the HttpConnectRecord to use
- * @param e the exception thrown during the request, may
be null
- * @return the updated retry event
- */
- private HttpRetryEvent getAndUpdateRetryEvent(Map<String, Object>
attributes, HttpConnectRecord httpConnectRecord, Throwable e) {
- // get the retry event
- HttpRetryEvent retryEvent = (HttpRetryEvent)
attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
- // update the retry event
- retryEvent.setLastException(e);
- return retryEvent;
- }
-
/**
* Gets and updates the multi http request context based on the provided
attributes and HttpConnectRecord.
*
- * @param attributes the attributes to use
- * @param retryEvent the retry event to use
+ * @param attributes the attributes to use
+ * @param attemptEvent the HttpAttemptEvent to use
* @return the updated multi http request context
*/
- private MultiHttpRequestContext
getAndUpdateMultiHttpRequestContext(Map<String, Object> attributes,
HttpRetryEvent retryEvent) {
+ private MultiHttpRequestContext
getAndUpdateMultiHttpRequestContext(Map<String, Object> attributes,
HttpAttemptEvent attemptEvent) {
// get the multi http request context
MultiHttpRequestContext multiHttpRequestContext =
(MultiHttpRequestContext) attributes.get(MultiHttpRequestContext.NAME);
- if (retryEvent.getLastException() == null ||
retryEvent.isMaxRetriesReached()) {
+ // Check if the current attempted event has completed
+ if (attemptEvent.isComplete()) {
// decrement the counter
multiHttpRequestContext.decrementRemainingRequests();
- // try set failed event
- if (retryEvent.getLastException() != null) {
- multiHttpRequestContext.setLastFailedEvent(retryEvent);
+ if (attemptEvent.getLastException() != null) {
+ // if all attempts are exhausted, set the last failed event
+ multiHttpRequestContext.setLastFailedEvent(attemptEvent);
}
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
index 820b46296..050839451 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java
@@ -20,7 +20,6 @@ package org.apache.eventmesh.connector.http.sink.handler.impl;
import org.apache.eventmesh.common.config.connector.http.HttpRetryConfig;
import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
-import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
import
org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler;
import org.apache.eventmesh.connector.http.util.HttpUtils;
@@ -51,10 +50,38 @@ public class HttpSinkHandlerRetryWrapper extends
AbstractHttpSinkHandler {
private final HttpSinkHandler sinkHandler;
+ private final RetryPolicy<HttpResponse<Buffer>> retryPolicy;
+
public HttpSinkHandlerRetryWrapper(SinkConnectorConfig
sinkConnectorConfig, HttpSinkHandler sinkHandler) {
super(sinkConnectorConfig);
this.sinkHandler = sinkHandler;
this.httpRetryConfig = getSinkConnectorConfig().getRetryConfig();
+ this.retryPolicy = buildRetryPolicy();
+ }
+
+ private RetryPolicy<HttpResponse<Buffer>> buildRetryPolicy() {
+ return RetryPolicy.<HttpResponse<Buffer>>builder()
+ .handleIf(e -> e instanceof ConnectException)
+ .handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess()
&& !HttpUtils.is2xxSuccessful(response.statusCode()))
+ .withMaxRetries(httpRetryConfig.getMaxRetries())
+ .withDelay(Duration.ofMillis(httpRetryConfig.getInterval()))
+ .onRetry(event -> {
+ if (log.isDebugEnabled()) {
+ log.warn("Failed to deliver message after {} attempts.
Retrying in {} ms. Error: {}",
+ event.getAttemptCount(),
httpRetryConfig.getInterval(), event.getLastException());
+ } else {
+ log.warn("Failed to deliver message after {} attempts.
Retrying in {} ms.",
+ event.getAttemptCount(),
httpRetryConfig.getInterval());
+ }
+ }).onFailure(event -> {
+ if (log.isDebugEnabled()) {
+ log.error("Failed to deliver message after {} attempts.
Error: {}",
+ event.getAttemptCount(), event.getException());
+ } else {
+ log.error("Failed to deliver message after {} attempts.",
+ event.getAttemptCount());
+ }
+ }).build();
}
/**
@@ -78,36 +105,8 @@ public class HttpSinkHandlerRetryWrapper extends
AbstractHttpSinkHandler {
@Override
public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord
httpConnectRecord, Map<String, Object> attributes,
ConnectRecord connectRecord) {
-
- // Build the retry policy
- RetryPolicy<HttpResponse<Buffer>> retryPolicy =
RetryPolicy.<HttpResponse<Buffer>>builder()
- .handleIf(e -> e instanceof ConnectException)
- .handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess()
&& !HttpUtils.is2xxSuccessful(response.statusCode()))
- .withMaxRetries(httpRetryConfig.getMaxRetries())
- .withDelay(Duration.ofMillis(httpRetryConfig.getInterval()))
- .onRetry(event -> {
- if (log.isDebugEnabled()) {
- log.warn("Retrying the request to {} for the {} time. {}",
url, event.getAttemptCount(), httpConnectRecord);
- } else {
- log.warn("Retrying the request to {} for the {} time.",
url, event.getAttemptCount());
- }
- // update the retry event
- HttpRetryEvent retryEvent = (HttpRetryEvent)
attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
- retryEvent.increaseCurrentRetries();
- })
- .onFailure(event -> {
- if (log.isDebugEnabled()) {
- log.error("Failed to send the request to {} after {}
attempts. {}", url, event.getAttemptCount(),
- httpConnectRecord, event.getException());
- } else {
- log.error("Failed to send the request to {} after {}
attempts.", url, event.getAttemptCount(), event.getException());
- }
- }).build();
-
- // Handle the ConnectRecord with retry policy
Failsafe.with(retryPolicy)
.getStageAsync(() -> sinkHandler.deliver(url, httpConnectRecord,
attributes, connectRecord).toCompletionStage());
-
return null;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
index 7edd84a96..0751918ee 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java
@@ -21,11 +21,11 @@ import
org.apache.eventmesh.common.config.connector.http.HttpWebhookConfig;
import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
import org.apache.eventmesh.common.exception.EventMeshException;
import
org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue;
+import org.apache.eventmesh.connector.http.sink.data.HttpAttemptEvent;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata;
import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord;
import org.apache.eventmesh.connector.http.sink.data.HttpExportRecordPage;
-import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.commons.lang3.StringUtils;
@@ -216,18 +216,14 @@ public class WebhookHttpSinkHandler extends
CommonHttpSinkHandler {
Future<HttpResponse<Buffer>> responseFuture = super.deliver(url,
httpConnectRecord, attributes, connectRecord);
// store the received data
return responseFuture.onComplete(arr -> {
- // get tryEvent from attributes
- HttpRetryEvent retryEvent = (HttpRetryEvent)
attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
+ // get HttpAttemptEvent
+ HttpAttemptEvent attemptEvent = (HttpAttemptEvent)
attributes.get(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId());
- HttpResponse<Buffer> response = null;
- if (arr.succeeded()) {
- response = arr.result();
- } else {
- retryEvent.setLastException(arr.cause());
- }
+ // get the response
+ HttpResponse<Buffer> response = arr.succeeded() ? arr.result() :
null;
// create ExportMetadata
- HttpExportMetadata httpExportMetadata =
buildHttpExportMetadata(url, response, httpConnectRecord, retryEvent);
+ HttpExportMetadata httpExportMetadata =
buildHttpExportMetadata(url, response, httpConnectRecord, attemptEvent);
// create ExportRecord
HttpExportRecord exportRecord = new
HttpExportRecord(httpExportMetadata, arr.succeeded() ?
arr.result().bodyAsString() : null);
@@ -242,17 +238,16 @@ public class WebhookHttpSinkHandler extends
CommonHttpSinkHandler {
* @param url the URI to which the HttpConnectRecord was sent
* @param response the response received from the URI
* @param httpConnectRecord the HttpConnectRecord that was sent
- * @param retryEvent the SingleHttpRetryEvent that was used for
retries
+ * @param attemptEvent the HttpAttemptEvent that was used to send the
HttpConnectRecord
* @return the HttpExportMetadata object
*/
private HttpExportMetadata buildHttpExportMetadata(URI url,
HttpResponse<Buffer> response, HttpConnectRecord httpConnectRecord,
- HttpRetryEvent retryEvent) {
+ HttpAttemptEvent attemptEvent) {
String msg = null;
// order of precedence: lastException > response > null
- if (retryEvent.getLastException() != null) {
- msg = retryEvent.getLimitedExceptionMessage();
- retryEvent.setLastException(null);
+ if (attemptEvent.getLastException() != null) {
+ msg = attemptEvent.getLimitedExceptionMessage();
} else if (response != null) {
msg = response.statusMessage();
}
@@ -263,7 +258,7 @@ public class WebhookHttpSinkHandler extends
CommonHttpSinkHandler {
.message(msg)
.receivedTime(LocalDateTime.now())
.recordId(httpConnectRecord.getHttpRecordId())
- .retryNum(retryEvent.getCurrentRetries())
+ .retryNum(attemptEvent.getAttempts() - 1)
.build();
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
index 5f65f0749..be2b52e73 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java
@@ -83,14 +83,12 @@ public class HttpSinkConnectorTest {
httpRequest -> {
// Increase the number of requests received
counter.incrementAndGet();
- JSONObject requestBody =
JSON.parseObject(httpRequest.getBodyAsString());
return HttpResponse.response()
.withContentType(MediaType.APPLICATION_JSON)
.withStatusCode(HttpStatus.SC_OK)
.withBody(new JSONObject()
.fluentPut("code", 0)
.fluentPut("message", "success")
- .fluentPut("data",
requestBody.getJSONObject("data").get("data"))
.toJSONString()
); // .withDelay(TimeUnit.SECONDS, 10);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]