This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new a6018dd28 [ISSUE #5069] Enhancement for http source/sink connector
(#5070)
a6018dd28 is described below
commit a6018dd28ddcc4b5a1888617a2e8990b0969a815
Author: mike_xwm <[email protected]>
AuthorDate: Wed Aug 7 17:18:08 2024 +0800
[ISSUE #5069] Enhancement for http source/sink connector (#5070)
* [ISSUE #5069] Enhancement for http source/sink connector
* update http source connector & config
* fix checkstyle error
---
.../connector/http/SourceConnectorConfig.java | 3 +
.../http/sink/handle/CommonHttpSinkHandler.java | 101 +++++++++++++--------
.../connector/http/source/HttpSourceConnector.java | 36 +++++---
.../http/source/protocol/impl/CommonProtocol.java | 15 ++-
4 files changed, 100 insertions(+), 55 deletions(-)
diff --git
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
index 4f69f5504..b7f075e6d 100644
---
a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
+++
b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/http/SourceConnectorConfig.java
@@ -55,4 +55,7 @@ public class SourceConnectorConfig {
// extra config, e.g. GitHub secret
private Map<String, String> extraConfig = new HashMap<>();
+
+ // data consistency enabled, default true
+ private boolean dataConsistencyEnabled = true;
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
index c6cc90e0e..4bc365a13 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java
@@ -21,6 +21,8 @@ import
org.apache.eventmesh.common.remote.offset.http.HttpRecordOffset;
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
import org.apache.eventmesh.connector.http.util.HttpUtils;
+import
org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import java.net.URI;
@@ -111,14 +113,70 @@ public class CommonHttpSinkHandler implements
HttpSinkHandler {
// convert ConnectRecord to HttpConnectRecord
String type = String.format("%s.%s.%s",
connectorConfig.getConnectorName(), url.getScheme(), "common");
HttpConnectRecord httpConnectRecord =
HttpConnectRecord.convertConnectRecord(record, type);
- deliver(url, httpConnectRecord);
+ // get timestamp and offset
+ Long timestamp = httpConnectRecord.getData().getTimestamp();
+ Map<String, ?> offset = null;
+ try {
+ // May throw NullPointerException.
+ offset = ((HttpRecordOffset)
httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
+ } catch (NullPointerException e) {
+ // ignore null pointer exception
+ }
+ final Map<String, ?> finalOffset = offset;
+ Future<HttpResponse<Buffer>> responseFuture = deliver(url,
httpConnectRecord);
+ responseFuture.onSuccess(res -> {
+ log.info("Request sent successfully. Record: timestamp={},
offset={}", timestamp, finalOffset);
+ // log the response
+ if (HttpUtils.is2xxSuccessful(res.statusCode())) {
+ if (log.isDebugEnabled()) {
+ log.debug("Received successful response:
statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
+ res.statusCode(), timestamp, finalOffset,
res.bodyAsString());
+ } else {
+ log.info("Received successful response: statusCode={}.
Record: timestamp={}, offset={}", res.statusCode(), timestamp,
+ finalOffset);
+ }
+
record.getCallback().onSuccess(convertToSendResult(record));
+ } else {
+ if (log.isDebugEnabled()) {
+ log.warn("Received non-2xx response: statusCode={}.
Record: timestamp={}, offset={}, responseBody={}",
+ res.statusCode(), timestamp, finalOffset,
res.bodyAsString());
+ } else {
+ log.warn("Received non-2xx response: statusCode={}.
Record: timestamp={}, offset={}", res.statusCode(), timestamp,
+ finalOffset);
+ }
+ record.getCallback()
+ .onException(buildSendExceptionContext(record, new
RuntimeException("HTTP response code: " + res.statusCode())));
+ }
+ }).onFailure(err -> {
+ log.error("Request failed to send. Record: timestamp={},
offset={}", timestamp, finalOffset, err);
+
record.getCallback().onException(buildSendExceptionContext(record, err));
+ });
+ }
+ }
+
+ private SendResult convertToSendResult(ConnectRecord record) {
+ SendResult result = new SendResult();
+ result.setMessageId(record.getRecordId());
+ if
(org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic")))
{
+ result.setTopic(record.getExtension("topic"));
+ }
+ return result;
+ }
+
+ private SendExceptionContext buildSendExceptionContext(ConnectRecord
record, Throwable e) {
+ SendExceptionContext sendExceptionContext = new SendExceptionContext();
+ sendExceptionContext.setMessageId(record.getRecordId());
+ sendExceptionContext.setCause(e);
+ if
(org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic")))
{
+ sendExceptionContext.setTopic(record.getExtension("topic"));
}
+ return sendExceptionContext;
}
/**
- * Processes HttpConnectRecord on specified URL while returning its own
processing logic.
- * This method sends the HttpConnectRecord to the specified URL using the
WebClient.
+ * Processes HttpConnectRecord on specified URL while returning its own
processing logic. This method sends the HttpConnectRecord to the specified
+ * URL using the WebClient.
*
* @param url URI to which the HttpConnectRecord should be
sent
* @param httpConnectRecord HttpConnectRecord to process
@@ -130,48 +188,13 @@ public class CommonHttpSinkHandler implements
HttpSinkHandler {
MultiMap headers = HttpHeaders.headers()
.set(HttpHeaderNames.CONTENT_TYPE, "application/json;
charset=utf-8")
.set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");
-
- // get timestamp and offset
- Long timestamp = httpConnectRecord.getData().getTimestamp();
- Map<String, ?> offset = null;
- try {
- // May throw NullPointerException.
- offset = ((HttpRecordOffset)
httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
- } catch (NullPointerException e) {
- // ignore null pointer exception
- }
- final Map<String, ?> finalOffset = offset;
-
// send the request
return this.webClient.post(url.getPath())
.host(url.getHost())
.port(url.getPort() == -1 ? (Objects.equals(url.getScheme(),
"https") ? 443 : 80) : url.getPort())
.putHeaders(headers)
.ssl(Objects.equals(url.getScheme(), "https"))
- .sendJson(httpConnectRecord)
- .onSuccess(res -> {
- log.info("Request sent successfully. Record: timestamp={},
offset={}", timestamp, finalOffset);
- // log the response
- if (HttpUtils.is2xxSuccessful(res.statusCode())) {
- if (log.isDebugEnabled()) {
- log.debug("Received successful response:
statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
- res.statusCode(), timestamp, finalOffset,
res.bodyAsString());
- } else {
- log.info("Received successful response: statusCode={}.
Record: timestamp={}, offset={}", res.statusCode(), timestamp,
- finalOffset);
- }
- } else {
- if (log.isDebugEnabled()) {
- log.warn("Received non-2xx response: statusCode={}.
Record: timestamp={}, offset={}, responseBody={}",
- res.statusCode(), timestamp, finalOffset,
res.bodyAsString());
- } else {
- log.warn("Received non-2xx response: statusCode={}.
Record: timestamp={}, offset={}", res.statusCode(), timestamp,
- finalOffset);
- }
- }
-
- })
- .onFailure(err -> log.error("Request failed to send. Record:
timestamp={}, offset={}", timestamp, finalOffset, err));
+ .sendJson(httpConnectRecord);
}
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
index 1ca325b18..2b2a01a9d 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java
@@ -34,6 +34,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
@@ -41,6 +42,7 @@ import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.LoggerHandler;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -52,22 +54,18 @@ public class HttpSourceConnector implements Source,
ConnectorCreateService<Sourc
private int batchSize;
+ private Route route;
+
private Protocol protocol;
private HttpServer server;
+ @Getter
private volatile boolean started = false;
+ @Getter
private volatile boolean destroyed = false;
- public boolean isStarted() {
- return started;
- }
-
- public boolean isDestroyed() {
- return destroyed;
- }
-
@Override
public Class<? extends Config> configClass() {
@@ -106,7 +104,7 @@ public class HttpSourceConnector implements Source,
ConnectorCreateService<Sourc
final Vertx vertx = Vertx.vertx();
final Router router = Router.router(vertx);
- final Route route = router.route()
+ route = router.route()
.path(this.sourceConfig.connectorConfig.getPath())
.handler(LoggerHandler.create());
@@ -136,7 +134,15 @@ public class HttpSourceConnector implements Source,
ConnectorCreateService<Sourc
@Override
public void commit(ConnectRecord record) {
-
+ if (this.route != null &&
sourceConfig.getConnectorConfig().isDataConsistencyEnabled()) {
+ this.route.handler(ctx -> {
+ // Return 200 OK
+ ctx.response()
+ .putHeader("content-type", "application/json")
+ .setStatusCode(HttpResponseStatus.OK.code())
+ .end("{\"status\":\"success\",\"recordId\":\"" +
record.getRecordId() + "\"}");
+ });
+ }
}
@Override
@@ -146,7 +152,15 @@ public class HttpSourceConnector implements Source,
ConnectorCreateService<Sourc
@Override
public void onException(ConnectRecord record) {
-
+ if (this.route != null) {
+ this.route.failureHandler(ctx -> {
+ log.error("Failed to handle the request, recordId {}. ",
record.getRecordId(), ctx.failure());
+ // Return Bad Response
+ ctx.response()
+
.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())
+ .end("{\"status\":\"failed\",\"recordId\":\"" +
record.getRecordId() + "\"}");
+ });
+ }
}
@Override
diff --git
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
index 80e4f0a75..738f04523 100644
---
a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
+++
b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/protocol/impl/CommonProtocol.java
@@ -45,6 +45,8 @@ public class CommonProtocol implements Protocol {
public static final String PROTOCOL_NAME = "Common";
+ private SourceConnectorConfig sourceConnectorConfig;
+
/**
* Initialize the protocol
*
@@ -52,7 +54,7 @@ public class CommonProtocol implements Protocol {
*/
@Override
public void initialize(SourceConnectorConfig sourceConnectorConfig) {
-
+ this.sourceConnectorConfig = sourceConnectorConfig;
}
/**
@@ -77,10 +79,13 @@ public class CommonProtocol implements Protocol {
throw new IllegalStateException("Failed to store the
request.");
}
- // Return 200 OK
- ctx.response()
- .setStatusCode(HttpResponseStatus.OK.code())
- .end(CommonResponse.success().toJsonStr());
+ if (!sourceConnectorConfig.isDataConsistencyEnabled()) {
+ // Return 200 OK
+ ctx.response()
+ .setStatusCode(HttpResponseStatus.OK.code())
+ .end(CommonResponse.success().toJsonStr());
+ }
+
})
.failureHandler(ctx -> {
log.error("Failed to handle the request. ", ctx.failure());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]