cnzakii commented on code in PR #4837: URL: https://github.com/apache/eventmesh/pull/4837#discussion_r1569809935
########## eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.handle; + +import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.time.LocalDateTime; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpHeaders; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; + +import lombok.extern.slf4j.Slf4j; + +/** + * Common HttpSinkHandler to handle ConnectRecord + */ +@Slf4j +public class CommonHttpSinkHandler implements HttpSinkHandler { + + private final SinkConnectorConfig connectorConfig; + + private WebClient webClient; + + private final String type; + + // store the received data, when webhook is enabled + private final BlockingQueue<JSONObject> receivedDataQueue; + + public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { + SinkConnectorConfig.fillDefault(sinkConnectorConfig); + this.connectorConfig = sinkConnectorConfig; + this.receivedDataQueue = this.connectorConfig.isWebhook() ? new LinkedBlockingQueue<>() : null; + type = String.format("%s.%s.%s", + sinkConnectorConfig.getConnectorName(), + sinkConnectorConfig.isSsl() ? "https" : "http", + sinkConnectorConfig.isWebhook() ? "webhook" : "common"); + } + + /** + * Get the oldest data in the queue + * + * @return received data + */ + public Object getReceivedData() { + if (!this.connectorConfig.isWebhook()) { + return null; + } + return this.receivedDataQueue.poll(); + } + + /** + * Get all received data + * + * @return all received data + */ + public Object[] getAllReceivedData() { + if (!connectorConfig.isWebhook() || receivedDataQueue.isEmpty()) { + return new Object[0]; + } + Object[] arr = receivedDataQueue.toArray(); + receivedDataQueue.clear(); + return arr; + } + + + @Override + public void start() { + // Create WebClient + doInitWebClient(); + } + + private void doInitWebClient() { + final Vertx vertx = Vertx.vertx(); + // TODO add more configurations + WebClientOptions options = new WebClientOptions() + .setDefaultHost(this.connectorConfig.getHost()) + .setDefaultPort(this.connectorConfig.getPort()) + .setSsl(this.connectorConfig.isSsl()) + .setIdleTimeout(this.connectorConfig.getIdleTimeout()) + .setIdleTimeoutUnit(TimeUnit.MILLISECONDS) + .setConnectTimeout(this.connectorConfig.getConnectionTimeout()); + + this.webClient = WebClient.create(vertx, options); + } + + + @Override + public void handle(ConnectRecord record) { + // create headers + MultiMap headers = HttpHeaders.headers(); + headers.add(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8"); + headers.add(HttpHeaderNames.USER_AGENT, "EventMesh-Sink-Connector"); Review Comment: I'm going to delete this line of code because it's a "code smell". The correct way to write it is to set whether it is enabled through `setUserAgentEnabled()` of `WebClientOptions` (default: `true`), or to set the name and enable it through `setUserAgent` (default name: `Vert.x-WebClient/4.4.6`) ########## eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java: ########## @@ -100,25 +100,25 @@ public void start() { private void doInitWebClient() { final Vertx vertx = Vertx.vertx(); - // TODO add more configurations WebClientOptions options = new WebClientOptions() .setDefaultHost(this.connectorConfig.getHost()) .setDefaultPort(this.connectorConfig.getPort()) .setSsl(this.connectorConfig.isSsl()) + .setKeepAlive(this.connectorConfig.isKeepAlive()) + .setKeepAliveTimeout(this.connectorConfig.getKeepAliveTimeout() / 1000) .setIdleTimeout(this.connectorConfig.getIdleTimeout()) .setIdleTimeoutUnit(TimeUnit.MILLISECONDS) - .setConnectTimeout(this.connectorConfig.getConnectionTimeout()); - + .setConnectTimeout(this.connectorConfig.getConnectionTimeout()) + .setMaxPoolSize(this.connectorConfig.getMaxConnectionPoolSize()); this.webClient = WebClient.create(vertx, options); } @Override public void handle(ConnectRecord record) { // create headers - MultiMap headers = HttpHeaders.headers(); - headers.add(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8"); - headers.add(HttpHeaderNames.USER_AGENT, "EventMesh-Sink-Connector"); + MultiMap headers = HttpHeaders.headers() + .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8"); Review Comment: This is a mistake. The correct value should be `HttpHeaderNames.CONTENT_TYPE`. I will modify it and submit it. ########## eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.handle; + +import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.time.LocalDateTime; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpHeaders; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; + +import lombok.extern.slf4j.Slf4j; + +/** + * Common HttpSinkHandler to handle ConnectRecord + */ +@Slf4j +public class CommonHttpSinkHandler implements HttpSinkHandler { + + private final SinkConnectorConfig connectorConfig; + + private WebClient webClient; + + private final String type; + + // store the received data, when webhook is enabled + private final BlockingQueue<JSONObject> receivedDataQueue; + + public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { + SinkConnectorConfig.fillDefault(sinkConnectorConfig); + this.connectorConfig = sinkConnectorConfig; + this.receivedDataQueue = this.connectorConfig.isWebhook() ? new LinkedBlockingQueue<>() : null; + type = String.format("%s.%s.%s", + sinkConnectorConfig.getConnectorName(), + sinkConnectorConfig.isSsl() ? "https" : "http", + sinkConnectorConfig.isWebhook() ? "webhook" : "common"); + } + + /** + * Get the oldest data in the queue + * + * @return received data + */ + public Object getReceivedData() { + if (!this.connectorConfig.isWebhook()) { + return null; + } + return this.receivedDataQueue.poll(); + } + + /** + * Get all received data + * + * @return all received data + */ + public Object[] getAllReceivedData() { + if (!connectorConfig.isWebhook() || receivedDataQueue.isEmpty()) { + return new Object[0]; + } + Object[] arr = receivedDataQueue.toArray(); + receivedDataQueue.clear(); + return arr; + } + + + @Override + public void start() { + // Create WebClient + doInitWebClient(); + } + + private void doInitWebClient() { + final Vertx vertx = Vertx.vertx(); + // TODO add more configurations + WebClientOptions options = new WebClientOptions() + .setDefaultHost(this.connectorConfig.getHost()) + .setDefaultPort(this.connectorConfig.getPort()) + .setSsl(this.connectorConfig.isSsl()) + .setIdleTimeout(this.connectorConfig.getIdleTimeout()) + .setIdleTimeoutUnit(TimeUnit.MILLISECONDS) + .setConnectTimeout(this.connectorConfig.getConnectionTimeout()); + + this.webClient = WebClient.create(vertx, options); + } + + + @Override + public void handle(ConnectRecord record) { + // create headers + MultiMap headers = HttpHeaders.headers(); + headers.add(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8"); + headers.add(HttpHeaderNames.USER_AGENT, "EventMesh-Sink-Connector"); Review Comment: Resolved in https://github.com/apache/eventmesh/pull/4837/commits/5cb55a84f5ea43e10c58613bd8327ec62518846e ########## eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.handle; + +import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.time.LocalDateTime; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpHeaders; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; + +import lombok.extern.slf4j.Slf4j; + +/** + * Common HttpSinkHandler to handle ConnectRecord + */ +@Slf4j +public class CommonHttpSinkHandler implements HttpSinkHandler { + + private final SinkConnectorConfig connectorConfig; + + private WebClient webClient; + + private final String type; + + // store the received data, when webhook is enabled + private final BlockingQueue<JSONObject> receivedDataQueue; + + public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { + SinkConnectorConfig.populateFieldsWithDefaults(sinkConnectorConfig); + this.connectorConfig = sinkConnectorConfig; + this.receivedDataQueue = this.connectorConfig.isWebhook() ? new LinkedBlockingQueue<>() : null; + type = String.format("%s.%s.%s", + sinkConnectorConfig.getConnectorName(), + sinkConnectorConfig.isSsl() ? "https" : "http", + sinkConnectorConfig.isWebhook() ? "webhook" : "common"); + } + + /** + * Get the oldest data in the queue + * + * @return received data + */ + public Object getReceivedData() { + if (!this.connectorConfig.isWebhook()) { + return null; + } + return this.receivedDataQueue.poll(); + } + + /** + * Get all received data + * + * @return all received data + */ + public Object[] getAllReceivedData() { + if (!connectorConfig.isWebhook() || receivedDataQueue.isEmpty()) { + return new Object[0]; + } + Object[] arr = receivedDataQueue.toArray(); + receivedDataQueue.clear(); + return arr; + } + + + @Override + public void start() { + // Create WebClient + doInitWebClient(); + } + + private void doInitWebClient() { + final Vertx vertx = Vertx.vertx(); + WebClientOptions options = new WebClientOptions() + .setDefaultHost(this.connectorConfig.getHost()) + .setDefaultPort(this.connectorConfig.getPort()) + .setSsl(this.connectorConfig.isSsl()) + .setKeepAlive(this.connectorConfig.isKeepAlive()) + .setKeepAliveTimeout(this.connectorConfig.getKeepAliveTimeout() / 1000) + .setIdleTimeout(this.connectorConfig.getIdleTimeout()) + .setIdleTimeoutUnit(TimeUnit.MILLISECONDS) + .setConnectTimeout(this.connectorConfig.getConnectionTimeout()) + .setMaxPoolSize(this.connectorConfig.getMaxConnectionPoolSize()); + this.webClient = WebClient.create(vertx, options); + } + + + @Override + public void handle(ConnectRecord record) { + // create headers + MultiMap headers = HttpHeaders.headers() + .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8"); + + // convert ConnectRecord to HttpConnectRecord + HttpConnectRecord httpConnectRecord = convertToHttpConnectRecord(record); + + // send the request + this.webClient.post(this.connectorConfig.getPath()) + .putHeaders(headers) + .sendJson(httpConnectRecord) + .onSuccess(res -> { + Long timestamp = record.getTimestamp(); + Map<String, ?> offset = record.getPosition().getOffset().getOffset(); + log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, offset); + // Determine whether the status code is 200 + if (res.statusCode() == HttpResponseStatus.OK.code()) { + // store the received data, when webhook is enabled + if (this.connectorConfig.isWebhook()) { + String dataStr = res.body().toString(); + if (dataStr.isEmpty()) { + log.warn("Received data is empty."); + return; + } + JSONObject receivedData = JSON.parseObject(dataStr); Review Comment: >Serializing a response body and deserialize it again is a lot performance overhead. I think there's no need to output a warning when response body is empty. Ok, I will call only `res.bodyAsString()` or `res.bodyAsJsonObject()` method to serialize the response body. >By the way, the bodyAsString() method has a better performance than toString(). I don't really agree with you because method `bodyAsString()` essentially also calls method `toString()`   ########## eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.connector; + +import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig; +import org.apache.eventmesh.openconnect.api.config.Config; +import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; +import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; +import org.apache.eventmesh.openconnect.api.sink.Sink; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.util.List; +import java.util.Objects; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class HttpSinkConnector implements Sink { + + private HttpSinkConfig httpSinkConfig; + + private WebClient webClient; + + private volatile boolean isRunning = false; + + @Override + public Class<? extends Config> configClass() { + return HttpSinkConfig.class; + } + + @Override + public void init(Config config) throws Exception { + httpSinkConfig = (HttpSinkConfig) config; + doInit(); + } + + @Override + public void init(ConnectorContext connectorContext) throws Exception { + SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext; + this.httpSinkConfig = (HttpSinkConfig) sinkConnectorContext.getSinkConfig(); + doInit(); + } + + @SneakyThrows + private void doInit() { + final Vertx vertx = Vertx.vertx(); + // TODO Add more configurations + WebClientOptions options = new WebClientOptions() Review Comment: Resolved in https://github.com/apache/eventmesh/pull/4837/commits/5cb55a84f5ea43e10c58613bd8327ec62518846e ########## eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml: ########## @@ -30,6 +30,6 @@ connectorConfig: port: 8987 path: /test ssl: false - idleTimeout: 5 + idleTimeout: 5000 webhookConfig: Review Comment: Resolved in https://github.com/apache/eventmesh/pull/4837/commits/408aaa6442dd232f51b3799a57dc1641f03ee72f ########## eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java: ########## @@ -120,14 +120,14 @@ private void sendMessage(ConnectRecord record) { .putHeader("Content-Type", "application/json; charset=utf-8") .sendJson(record, ar -> { if (ar.succeeded()) { + log.info("[HttpSinkConnector] Successfully send message via HTTP. Record: timestamp={}, offset={}", record.getTimestamp(), + record.getPosition().getOffset()); if (ar.result().statusCode() != HttpResponseStatus.OK.code()) { - log.error("[HttpSinkConnector] Failed to send message via HTTP. Response: {}", ar.result()); - } else { - log.info("[HttpSinkConnector] Successfully send message via HTTP. "); + log.error("[HttpSinkConnector] Unexpected response received. StatusCode: {}", ar.result().statusCode()); Review Comment: Resolved in https://github.com/apache/eventmesh/pull/4837/commits/408aaa6442dd232f51b3799a57dc1641f03ee72f ########## eventmesh-connectors/eventmesh-connector-http/src/main/resources/sink-config.yml: ########## @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pubSubConfig: + meshAddress: 127.0.0.1:10000 + subject: TopicTest + idc: FT + env: PRD + group: httpSink + appId: 5032 + userName: httpSinkUser + passWord: httpPassWord +connectorConfig: + connectorName: httpSink + host: 127.0.0.1 + port: 8987 + path: /test + ssl: false Review Comment: This is a good idea, but do we support both common HTTP and webhooks? If so, I think it can be configured like this: ``` webhook: true url: - https://webhook.foo.com/callback - http://webhook.bar.com:8080/callback - ... ``` ########## eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java: ########## @@ -83,20 +83,28 @@ private void doInitCallbackServer() { .path(this.webhookConfig.getCallbackPath()) .method(HttpMethod.POST) .produces("application/json") - .handler(ctx -> { - JSONObject callbackData = JSON.parseObject(ctx.body().asString()); - // store callback data - if (!this.callbackQueue.offer(callbackData)) { - log.error("Callback data is full, discard the data. Data: {}", callbackData); + .handler(ctx -> ctx.request().body().onComplete(ar -> { + if (ar.succeeded()) { + JSONObject callbackData = JSON.parseObject(ar.result().toString()); + // store callback data + if (!this.callbackQueue.offer(callbackData)) { + log.error("Callback data is full, discard the data. Data: {}", callbackData); Review Comment: Resolved in https://github.com/apache/eventmesh/pull/4837/commits/408aaa6442dd232f51b3799a57dc1641f03ee72f ########## eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java: ########## @@ -83,20 +83,28 @@ private void doInitCallbackServer() { .path(this.webhookConfig.getCallbackPath()) .method(HttpMethod.POST) .produces("application/json") - .handler(ctx -> { - JSONObject callbackData = JSON.parseObject(ctx.body().asString()); - // store callback data - if (!this.callbackQueue.offer(callbackData)) { - log.error("Callback data is full, discard the data. Data: {}", callbackData); + .handler(ctx -> ctx.request().body().onComplete(ar -> { + if (ar.succeeded()) { + JSONObject callbackData = JSON.parseObject(ar.result().toString()); + // store callback data + if (!this.callbackQueue.offer(callbackData)) { + log.error("Callback data is full, discard the data. Data: {}", callbackData); + } else { + log.debug("Succeed to store callback data. Data: {}", callbackData); + } + // response 200 OK + ctx.response() + .putHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8") Review Comment: Resolved in https://github.com/apache/eventmesh/pull/4837/commits/408aaa6442dd232f51b3799a57dc1641f03ee72f ########## eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.handle; + +import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.time.LocalDateTime; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpHeaders; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; + +import lombok.extern.slf4j.Slf4j; + +/** + * Common HttpSinkHandler to handle ConnectRecord + */ +@Slf4j +public class CommonHttpSinkHandler implements HttpSinkHandler { + Review Comment: This is a very good suggestion, but I decided to separate the `webhook` from the`CommonHttpSinkHandler`.I want to make `CommonHttpSinkHandler` more "pure" and "generic", not more bloated! In `CommonHttpSinkHandler`, only the basic request sending functionality is retained, and it focuses only on sending the HTTP request and encapsulating the response into a `Future`, which is given to the other processor for processing. ``` @Override public void handle(ConnectRecord record) { for (URI url : this.urls) { // ............. send(url, httpConnectRecord); } } public Future<HttpResponse<Buffer>> send(URI url, HttpConnectRecord httpConnectRecord) { // ............. return this.webClient.post(url.getPath()) .host(url.getHost()) .port(url.getPort()) .putHeaders(headers) .ssl(Objects.equals(url.getScheme(), "https")) .sendJson(httpConnectRecord) .onSuccess(res -> { // ............. }) .onFailure(err -> { // ............. }); } ``` In `WebhookHttpSinkHandler`, send the message by calling the `send()` method of `CommonHttpSinkHandler`, and then add its own handler (holding the response body) in the returned Future. ``` public void handle(ConnectRecord record) { for (URI url : super.getUrls()) { // ............. // send the request Future<HttpResponse<Buffer>> responseFuture = send(url, httpConnectRecord); // handle the response doHandle(responseFuture); } } private void doHandle(Future<HttpResponse<Buffer>> responseFuture) { responseFuture.onSuccess(res -> { // ................ }); } ``` ########## eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/connector/HttpSinkConnector.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.connector; + +import org.apache.eventmesh.connector.http.sink.config.HttpSinkConfig; +import org.apache.eventmesh.openconnect.api.config.Config; +import org.apache.eventmesh.openconnect.api.connector.ConnectorContext; +import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext; +import org.apache.eventmesh.openconnect.api.sink.Sink; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.util.List; +import java.util.Objects; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class HttpSinkConnector implements Sink { + + private HttpSinkConfig httpSinkConfig; + + private WebClient webClient; + + private volatile boolean isRunning = false; + + @Override + public Class<? extends Config> configClass() { + return HttpSinkConfig.class; + } + + @Override + public void init(Config config) throws Exception { + httpSinkConfig = (HttpSinkConfig) config; + doInit(); + } + + @Override + public void init(ConnectorContext connectorContext) throws Exception { + SinkConnectorContext sinkConnectorContext = (SinkConnectorContext) connectorContext; + this.httpSinkConfig = (HttpSinkConfig) sinkConnectorContext.getSinkConfig(); + doInit(); + } + + @SneakyThrows + private void doInit() { + final Vertx vertx = Vertx.vertx(); + // TODO Add more configurations + WebClientOptions options = new WebClientOptions() Review Comment: I also need to use `vertx-junit5` to reconstruct the unit test and simulate a more realistic environment to verify my code. ########## eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/WebhookHttpSinkHandler.java: ########## @@ -49,7 +49,7 @@ public class WebhookHttpSinkHandler extends CommonHttpSinkHandler { private final HttpWebhookConfig webhookConfig; // store the callback data - private final BlockingQueue<Object> callbackQueue; + private final BlockingQueue<JSONObject> callbackQueue; Review Comment: Resolved in https://github.com/apache/eventmesh/pull/4837/commits/408aaa6442dd232f51b3799a57dc1641f03ee72f ########## eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.handle; + +import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.time.LocalDateTime; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpHeaders; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; + +import lombok.extern.slf4j.Slf4j; + +/** + * Common HttpSinkHandler to handle ConnectRecord + */ +@Slf4j +public class CommonHttpSinkHandler implements HttpSinkHandler { + + private final SinkConnectorConfig connectorConfig; + + private WebClient webClient; + + private final String type; + + // store the received data, when webhook is enabled + private final BlockingQueue<JSONObject> receivedDataQueue; + + public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { + SinkConnectorConfig.populateFieldsWithDefaults(sinkConnectorConfig); + this.connectorConfig = sinkConnectorConfig; + this.receivedDataQueue = this.connectorConfig.isWebhook() ? new LinkedBlockingQueue<>() : null; + type = String.format("%s.%s.%s", + sinkConnectorConfig.getConnectorName(), + sinkConnectorConfig.isSsl() ? "https" : "http", + sinkConnectorConfig.isWebhook() ? "webhook" : "common"); + } + + /** + * Get the oldest data in the queue + * + * @return received data + */ + public Object getReceivedData() { + if (!this.connectorConfig.isWebhook()) { + return null; + } + return this.receivedDataQueue.poll(); + } Review Comment: My original intent was to provide an outlet for exposing callback data, but I'm not quite sure if I want to expose it this way or by opening an HTTPServer. ########## eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.handle; + +import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.time.LocalDateTime; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpHeaders; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; + +import lombok.extern.slf4j.Slf4j; + +/** + * Common HttpSinkHandler to handle ConnectRecord + */ +@Slf4j +public class CommonHttpSinkHandler implements HttpSinkHandler { + + private final SinkConnectorConfig connectorConfig; + + private WebClient webClient; + + private final String type; + + // store the received data, when webhook is enabled + private final BlockingQueue<JSONObject> receivedDataQueue; + + public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { + SinkConnectorConfig.populateFieldsWithDefaults(sinkConnectorConfig); + this.connectorConfig = sinkConnectorConfig; + this.receivedDataQueue = this.connectorConfig.isWebhook() ? new LinkedBlockingQueue<>() : null; + type = String.format("%s.%s.%s", + sinkConnectorConfig.getConnectorName(), + sinkConnectorConfig.isSsl() ? "https" : "http", + sinkConnectorConfig.isWebhook() ? "webhook" : "common"); + } + + /** + * Get the oldest data in the queue + * + * @return received data + */ + public Object getReceivedData() { + if (!this.connectorConfig.isWebhook()) { + return null; + } + return this.receivedDataQueue.poll(); + } + + /** + * Get all received data + * + * @return all received data + */ + public Object[] getAllReceivedData() { + if (!connectorConfig.isWebhook() || receivedDataQueue.isEmpty()) { + return new Object[0]; + } + Object[] arr = receivedDataQueue.toArray(); + receivedDataQueue.clear(); + return arr; + } + + + @Override + public void start() { + // Create WebClient + doInitWebClient(); + } + + private void doInitWebClient() { + final Vertx vertx = Vertx.vertx(); + WebClientOptions options = new WebClientOptions() + .setDefaultHost(this.connectorConfig.getHost()) + .setDefaultPort(this.connectorConfig.getPort()) + .setSsl(this.connectorConfig.isSsl()) + .setKeepAlive(this.connectorConfig.isKeepAlive()) + .setKeepAliveTimeout(this.connectorConfig.getKeepAliveTimeout() / 1000) + .setIdleTimeout(this.connectorConfig.getIdleTimeout()) + .setIdleTimeoutUnit(TimeUnit.MILLISECONDS) + .setConnectTimeout(this.connectorConfig.getConnectionTimeout()) + .setMaxPoolSize(this.connectorConfig.getMaxConnectionPoolSize()); + this.webClient = WebClient.create(vertx, options); + } + + + @Override + public void handle(ConnectRecord record) { + // create headers + MultiMap headers = HttpHeaders.headers() + .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8"); + + // convert ConnectRecord to HttpConnectRecord + HttpConnectRecord httpConnectRecord = convertToHttpConnectRecord(record); + + // send the request + this.webClient.post(this.connectorConfig.getPath()) + .putHeaders(headers) + .sendJson(httpConnectRecord) + .onSuccess(res -> { + Long timestamp = record.getTimestamp(); + Map<String, ?> offset = record.getPosition().getOffset().getOffset(); + log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, offset); + // Determine whether the status code is 200 + if (res.statusCode() == HttpResponseStatus.OK.code()) { + // store the received data, when webhook is enabled + if (this.connectorConfig.isWebhook()) { + String dataStr = res.body().toString(); + if (dataStr.isEmpty()) { + log.warn("Received data is empty."); + return; + } + JSONObject receivedData = JSON.parseObject(dataStr); + if (receivedDataQueue.size() == Integer.MAX_VALUE) { + // if the queue is full, remove the oldest element + JSONObject removedData = receivedDataQueue.poll(); + log.info("The queue is full, remove the oldest element: {}", removedData); + } + boolean b = receivedDataQueue.offer(receivedData); + if (b) { + log.info("Successfully put the received data into the queue: {}", receivedData); + } else { + log.error("Failed to put the received data into the queue: {}", receivedData); + } + } + } else { + log.error("Unexpected response received. Record: timestamp={}, offset={}. Response: code={} header={}, body={}", + timestamp, + offset, + res.statusCode(), + res.headers(), + res.body().toString() + ); + } + }) + .onFailure(err -> { + Long timestamp = record.getTimestamp(); + Map<String, ?> offset = record.getPosition().getOffset().getOffset(); + log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, offset, err); + }); + } + + /** + * Convert ConnectRecord to HttpConnectRecord + * + * @param record the ConnectRecord to convert + * @return the converted HttpConnectRecord + */ + private HttpConnectRecord convertToHttpConnectRecord(ConnectRecord record) { + HttpConnectRecord httpConnectRecord = new HttpConnectRecord(); + httpConnectRecord.setType(this.type); + LocalDateTime currentTime = LocalDateTime.now(); + httpConnectRecord.setTimestamp(currentTime.toString()); + httpConnectRecord.setData(record); + return httpConnectRecord; + } Review Comment: OK, I will modify it. ########## eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handle/CommonHttpSinkHandler.java: ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.http.sink.handle; + +import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig; +import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; +import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; + +import java.time.LocalDateTime; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpHeaders; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; + +import lombok.extern.slf4j.Slf4j; + +/** + * Common HttpSinkHandler to handle ConnectRecord + */ +@Slf4j +public class CommonHttpSinkHandler implements HttpSinkHandler { + + private final SinkConnectorConfig connectorConfig; + + private WebClient webClient; + + private final String type; + + // store the received data, when webhook is enabled + private final BlockingQueue<JSONObject> receivedDataQueue; + + public CommonHttpSinkHandler(SinkConnectorConfig sinkConnectorConfig) { + SinkConnectorConfig.populateFieldsWithDefaults(sinkConnectorConfig); + this.connectorConfig = sinkConnectorConfig; + this.receivedDataQueue = this.connectorConfig.isWebhook() ? new LinkedBlockingQueue<>() : null; + type = String.format("%s.%s.%s", + sinkConnectorConfig.getConnectorName(), + sinkConnectorConfig.isSsl() ? "https" : "http", + sinkConnectorConfig.isWebhook() ? "webhook" : "common"); + } + + /** + * Get the oldest data in the queue + * + * @return received data + */ + public Object getReceivedData() { + if (!this.connectorConfig.isWebhook()) { + return null; + } + return this.receivedDataQueue.poll(); + } + + /** + * Get all received data + * + * @return all received data + */ + public Object[] getAllReceivedData() { + if (!connectorConfig.isWebhook() || receivedDataQueue.isEmpty()) { + return new Object[0]; + } + Object[] arr = receivedDataQueue.toArray(); + receivedDataQueue.clear(); + return arr; + } + + + @Override + public void start() { + // Create WebClient + doInitWebClient(); + } + + private void doInitWebClient() { + final Vertx vertx = Vertx.vertx(); + WebClientOptions options = new WebClientOptions() + .setDefaultHost(this.connectorConfig.getHost()) + .setDefaultPort(this.connectorConfig.getPort()) + .setSsl(this.connectorConfig.isSsl()) + .setKeepAlive(this.connectorConfig.isKeepAlive()) + .setKeepAliveTimeout(this.connectorConfig.getKeepAliveTimeout() / 1000) + .setIdleTimeout(this.connectorConfig.getIdleTimeout()) + .setIdleTimeoutUnit(TimeUnit.MILLISECONDS) + .setConnectTimeout(this.connectorConfig.getConnectionTimeout()) + .setMaxPoolSize(this.connectorConfig.getMaxConnectionPoolSize()); + this.webClient = WebClient.create(vertx, options); + } + + + @Override + public void handle(ConnectRecord record) { + // create headers + MultiMap headers = HttpHeaders.headers() + .set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8"); + + // convert ConnectRecord to HttpConnectRecord + HttpConnectRecord httpConnectRecord = convertToHttpConnectRecord(record); + + // send the request + this.webClient.post(this.connectorConfig.getPath()) + .putHeaders(headers) + .sendJson(httpConnectRecord) + .onSuccess(res -> { + Long timestamp = record.getTimestamp(); + Map<String, ?> offset = record.getPosition().getOffset().getOffset(); + log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, offset); + // Determine whether the status code is 200 + if (res.statusCode() == HttpResponseStatus.OK.code()) { + // store the received data, when webhook is enabled + if (this.connectorConfig.isWebhook()) { + String dataStr = res.body().toString(); + if (dataStr.isEmpty()) { + log.warn("Received data is empty."); + return; + } + JSONObject receivedData = JSON.parseObject(dataStr); + if (receivedDataQueue.size() == Integer.MAX_VALUE) { + // if the queue is full, remove the oldest element + JSONObject removedData = receivedDataQueue.poll(); + log.info("The queue is full, remove the oldest element: {}", removedData); + } + boolean b = receivedDataQueue.offer(receivedData); + if (b) { + log.info("Successfully put the received data into the queue: {}", receivedData); + } else { + log.error("Failed to put the received data into the queue: {}", receivedData); + } + } + } else { + log.error("Unexpected response received. Record: timestamp={}, offset={}. Response: code={} header={}, body={}", + timestamp, + offset, + res.statusCode(), + res.headers(), + res.body().toString() + ); + } + }) + .onFailure(err -> { + Long timestamp = record.getTimestamp(); + Map<String, ?> offset = record.getPosition().getOffset().getOffset(); + log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, offset, err); Review Comment: Retry? But you seem to mention in https://github.com/apache/eventmesh/pull/4837#issuecomment-2061431376 that the retries have to be done in Runtime, so I didn't complete the retry function -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@eventmesh.apache.org For additional commands, e-mail: issues-h...@eventmesh.apache.org