This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new a6018dd28 [ISSUE #5069] Enhancement for http source/sink connector 
(#5070)
a6018dd28 is described below

commit a6018dd28ddcc4b5a1888617a2e8990b0969a815
Author: mike_xwm <[email protected]>
AuthorDate: Wed Aug 7 17:18:08 2024 +0800

    [ISSUE #5069] Enhancement for http source/sink connector (#5070)
    
    * [ISSUE #5069] Enhancement for http source/sink connector
    
    * update http source connector & config
    
    * fix checkstyle error
---
 .../connector/http/SourceConnectorConfig.java      |   3 +
 .../http/sink/handle/CommonHttpSinkHandler.java    | 101 +++++++++++++--------
 .../connector/http/source/HttpSourceConnector.java |  36 +++++---
 .../http/source/protocol/impl/CommonProtocol.java  |  15 ++-
 4 files changed, 100 insertions(+), 55 deletions(-)

diff --git 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
index 4f69f5504..b7f075e6d 100644
--- 
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
+++ 
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
@@ -55,4 +55,7 @@ public class SourceConnectorConfig {
 
     // extra config, e.g. GitHub secret
     private Map<String, String> extraConfig = new HashMap<>();
+
+    // data consistency enabled, default true
+    private boolean dataConsistencyEnabled = true;
 }
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
index c6cc90e0e..4bc365a13 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
@@ -21,6 +21,8 @@ import 
org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset;
 import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
 import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
 import org.apache.eventmesh.connector.http.util.HttpUtils;
+import 
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
 import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
 
 import java.net.URI;
@@ -111,14 +113,70 @@ public class CommonHttpSinkHandler implements 
HttpSinkHandler {
             // convert ConnectRecord to HttpConnectRecord
             String type = String.format("%s.%s.%s", 
connectorConfig.getConnectorName(), url.getScheme(), "common");
             HttpConnectRecord httpConnectRecord = 
HttpConnectRecord.convertConnectRecord(record, type);
-            deliver(url, httpConnectRecord);
+            // get timestamp and offset
+            Long timestamp = httpConnectRecord.getData().getTimestamp();
+            Map<String, ?> offset = null;
+            try {
+                // May throw NullPointerException.
+                offset = ((HttpRecordOffset) 
httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
+            } catch (NullPointerException e) {
+                // ignore null pointer exception
+            }
+            final Map<String, ?> finalOffset = offset;
+            Future<HttpResponse<Buffer>> responseFuture = deliver(url, 
httpConnectRecord);
+            responseFuture.onSuccess(res -> {
+                log.info("Request sent successfully. Record: timestamp={}, 
offset={}", timestamp, finalOffset);
+                // log the response
+                if (HttpUtils.is2xxSuccessful(res.statusCode())) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Received successful response: 
statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
+                            res.statusCode(), timestamp, finalOffset, 
res.bodyAsString());
+                    } else {
+                        log.info("Received successful response: statusCode={}. 
Record: timestamp={}, offset={}", res.statusCode(), timestamp,
+                            finalOffset);
+                    }
+                    
record.getCallback().onSuccess(convertToSendResult(record));
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}, offset={}, responseBody={}",
+                            res.statusCode(), timestamp, finalOffset, 
res.bodyAsString());
+                    } else {
+                        log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}, offset={}", res.statusCode(), timestamp,
+                            finalOffset);
+                    }
+                    record.getCallback()
+                        .onException(buildSendExceptionContext(record, new 
RuntimeException("HTTP response code: " + res.statusCode())));
+                }
+            }).onFailure(err -> {
+                log.error("Request failed to send. Record: timestamp={}, 
offset={}", timestamp, finalOffset, err);
+                
record.getCallback().onException(buildSendExceptionContext(record, err));
+            });
+        }
+    }
+
+    private SendResult convertToSendResult(ConnectRecord record) {
+        SendResult result = new SendResult();
+        result.setMessageId(record.getRecordId());
+        if 
(org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) 
{
+            result.setTopic(record.getExtension("topic"));
+        }
+        return result;
+    }
+
+    private SendExceptionContext buildSendExceptionContext(ConnectRecord 
record, Throwable e) {
+        SendExceptionContext sendExceptionContext = new SendExceptionContext();
+        sendExceptionContext.setMessageId(record.getRecordId());
+        sendExceptionContext.setCause(e);
+        if 
(org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) 
{
+            sendExceptionContext.setTopic(record.getExtension("topic"));
         }
+        return sendExceptionContext;
     }
 
 
     /**
-     * Processes HttpConnectRecord on specified URL while returning its own 
processing logic.
-     * This method sends the HttpConnectRecord to the specified URL using the 
WebClient.
+     * Processes HttpConnectRecord on specified URL while returning its own 
processing logic. This method sends the HttpConnectRecord to the specified
+     * URL using the WebClient.
      *
      * @param url               URI to which the HttpConnectRecord should be 
sent
      * @param httpConnectRecord HttpConnectRecord to process
@@ -130,48 +188,13 @@ public class CommonHttpSinkHandler implements 
HttpSinkHandler {
         MultiMap headers = HttpHeaders.headers()
             .set(HttpHeaderNames.CONTENT_TYPE, "application/json; 
charset=utf-8")
             .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");
-
-        // get timestamp and offset
-        Long timestamp = httpConnectRecord.getData().getTimestamp();
-        Map<String, ?> offset = null;
-        try {
-            // May throw NullPointerException.
-            offset = ((HttpRecordOffset) 
httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
-        } catch (NullPointerException e) {
-            // ignore null pointer exception
-        }
-        final Map<String, ?> finalOffset = offset;
-
         // send the request
         return this.webClient.post(url.getPath())
             .host(url.getHost())
             .port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), 
"https") ? 443 : 80) : url.getPort())
             .putHeaders(headers)
             .ssl(Objects.equals(url.getScheme(), "https"))
-            .sendJson(httpConnectRecord)
-            .onSuccess(res -> {
-                log.info("Request sent successfully. Record: timestamp={}, 
offset={}", timestamp, finalOffset);
-                // log the response
-                if (HttpUtils.is2xxSuccessful(res.statusCode())) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Received successful response: 
statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
-                            res.statusCode(), timestamp, finalOffset, 
res.bodyAsString());
-                    } else {
-                        log.info("Received successful response: statusCode={}. 
Record: timestamp={}, offset={}", res.statusCode(), timestamp,
-                            finalOffset);
-                    }
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}, offset={}, responseBody={}",
-                            res.statusCode(), timestamp, finalOffset, 
res.bodyAsString());
-                    } else {
-                        log.warn("Received non-2xx response: statusCode={}. 
Record: timestamp={}, offset={}", res.statusCode(), timestamp,
-                            finalOffset);
-                    }
-                }
-
-            })
-            .onFailure(err -> log.error("Request failed to send. Record: 
timestamp={}, offset={}", timestamp, finalOffset, err));
+            .sendJson(httpConnectRecord);
     }
 
 
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
index 1ca325b18..2b2a01a9d 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
@@ -34,6 +34,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.handler.codec.http.HttpResponseStatus;
 import io.vertx.core.Vertx;
 import io.vertx.core.http.HttpServer;
 import io.vertx.core.http.HttpServerOptions;
@@ -41,6 +42,7 @@ import io.vertx.ext.web.Route;
 import io.vertx.ext.web.Router;
 import io.vertx.ext.web.handler.LoggerHandler;
 
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
@@ -52,22 +54,18 @@ public class HttpSourceConnector implements Source, 
ConnectorCreateService<Sourc
 
     private int batchSize;
 
+    private Route route;
+
     private Protocol protocol;
 
     private HttpServer server;
 
+    @Getter
     private volatile boolean started = false;
 
+    @Getter
     private volatile boolean destroyed = false;
 
-    public boolean isStarted() {
-        return started;
-    }
-
-    public boolean isDestroyed() {
-        return destroyed;
-    }
-
 
     @Override
     public Class<? extends Config> configClass() {
@@ -106,7 +104,7 @@ public class HttpSourceConnector implements Source, 
ConnectorCreateService<Sourc
 
         final Vertx vertx = Vertx.vertx();
         final Router router = Router.router(vertx);
-        final Route route = router.route()
+        route = router.route()
             .path(this.sourceConfig.connectorConfig.getPath())
             .handler(LoggerHandler.create());
 
@@ -136,7 +134,15 @@ public class HttpSourceConnector implements Source, 
ConnectorCreateService<Sourc
 
     @Override
     public void commit(ConnectRecord record) {
-
+        if (this.route != null && 
sourceConfig.getConnectorConfig().isDataConsistencyEnabled()) {
+            this.route.handler(ctx -> {
+                // Return 200 OK
+                ctx.response()
+                    .putHeader("content-type", "application/json")
+                    .setStatusCode(HttpResponseStatus.OK.code())
+                    .end("{\"status\":\"success\",\"recordId\":\"" + 
record.getRecordId() + "\"}");
+            });
+        }
     }
 
     @Override
@@ -146,7 +152,15 @@ public class HttpSourceConnector implements Source, 
ConnectorCreateService<Sourc
 
     @Override
     public void onException(ConnectRecord record) {
-
+        if (this.route != null) {
+            this.route.failureHandler(ctx -> {
+                log.error("Failed to handle the request, recordId {}. ", 
record.getRecordId(), ctx.failure());
+                // Return Bad Response
+                ctx.response()
+                    
.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())
+                    .end("{\"status\":\"failed\",\"recordId\":\"" + 
record.getRecordId() + "\"}");
+            });
+        }
     }
 
     @Override
diff --git 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
index 80e4f0a75..738f04523 100644
--- 
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
+++ 
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
@@ -45,6 +45,8 @@ public class CommonProtocol implements Protocol {
 
     public static final String PROTOCOL_NAME = "Common";
 
+    private SourceConnectorConfig sourceConnectorConfig;
+
     /**
      * Initialize the protocol
      *
@@ -52,7 +54,7 @@ public class CommonProtocol implements Protocol {
      */
     @Override
     public void initialize(SourceConnectorConfig sourceConnectorConfig) {
-
+        this.sourceConnectorConfig = sourceConnectorConfig;
     }
 
     /**
@@ -77,10 +79,13 @@ public class CommonProtocol implements Protocol {
                     throw new IllegalStateException("Failed to store the 
request.");
                 }
 
-                // Return 200 OK
-                ctx.response()
-                    .setStatusCode(HttpResponseStatus.OK.code())
-                    .end(CommonResponse.success().toJsonStr());
+                if (!sourceConnectorConfig.isDataConsistencyEnabled()) {
+                    // Return 200 OK
+                    ctx.response()
+                        .setStatusCode(HttpResponseStatus.OK.code())
+                        .end(CommonResponse.success().toJsonStr());
+                }
+
             })
             .failureHandler(ctx -> {
                 log.error("Failed to handle the request. ", ctx.failure());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to