Copilot commented on code in PR #9292:
URL: https://github.com/apache/seatunnel/pull/9292#discussion_r2080746270
##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java:
##########
@@ -48,18 +58,107 @@ public HttpSinkWriter(
SeaTunnelRowType seaTunnelRowType,
HttpParameter httpParameter,
SerializationSchema serializationSchema) {
+ this(seaTunnelRowType, httpParameter, serializationSchema, false, 1,
0, "json");
+ }
+
+ public HttpSinkWriter(
+ SeaTunnelRowType seaTunnelRowType,
+ HttpParameter httpParameter,
+ boolean arrayMode,
+ int batchSize,
+ int requestIntervalMs,
+ String format) {
+ this(
+ seaTunnelRowType,
+ httpParameter,
+ new JsonSerializationSchema(seaTunnelRowType),
+ arrayMode,
+ batchSize,
+ requestIntervalMs,
+ format);
+ }
+
+ public HttpSinkWriter(
+ SeaTunnelRowType seaTunnelRowType,
+ HttpParameter httpParameter,
+ SerializationSchema serializationSchema,
+ boolean arrayMode,
+ int batchSize,
+ int requestIntervalMs,
+ String format) {
this.seaTunnelRowType = seaTunnelRowType;
this.httpParameter = httpParameter;
- this.httpClient = new HttpClientProvider(httpParameter);
+ this.httpClient = createHttpClient(httpParameter);
this.serializationSchema = serializationSchema;
+ this.arrayMode = arrayMode;
+ this.batchSize = batchSize;
+ this.requestIntervalMs = requestIntervalMs;
+ this.format = format;
+ this.batchBuffer = new ArrayList<>(batchSize);
+ this.lastRequestTime = System.currentTimeMillis();
}
@Override
public void write(SeaTunnelRow element) throws IOException {
+ if (!arrayMode) {
+ // Object mode: send each record individually, ignore batch_size
setting
+ writeSingleRecord(element);
+ } else {
+ // Array mode: batch processing
+ batchBuffer.add(element);
+ if (batchBuffer.size() >= batchSize) {
+ flush();
+ }
+ }
+ }
+
+ private void writeSingleRecord(SeaTunnelRow element) throws IOException {
byte[] serialize = serializationSchema.serialize(element);
String body = new String(serialize);
+ doHttpRequest(body);
+ }
+
+ private void flush() throws IOException {
+ if (batchBuffer.isEmpty()) {
+ return;
+ }
+
+ // Check request interval
+ long currentTime = System.currentTimeMillis();
+ long timeSinceLastRequest = currentTime - lastRequestTime;
+ if (requestIntervalMs > 0 && timeSinceLastRequest < requestIntervalMs)
{
+ try {
+ Thread.sleep(requestIntervalMs - timeSinceLastRequest);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Sleep interrupted", e);
+ }
+ }
+
+ // Array mode: serialize batch data
+ if ("json".equalsIgnoreCase(format)) {
+ // Constructing JSON arrays
+ List<String> jsonRecords = new ArrayList<>(batchBuffer.size());
+ for (SeaTunnelRow row : batchBuffer) {
+ byte[] serialize = serializationSchema.serialize(row);
+ jsonRecords.add(new String(serialize));
+ }
+ String body = "[" + String.join(",", jsonRecords) + "]";
Review Comment:
[nitpick] Consider using a dedicated JSON library to construct the JSON
array, which can help ensure proper formatting and handle edge cases more
robustly.
##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSinkOptions.java:
##########
@@ -17,4 +17,33 @@
package org.apache.seatunnel.connectors.seatunnel.http.config;
-public class HttpSinkOptions extends HttpCommonOptions {}
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class HttpSinkOptions extends HttpCommonOptions {
+ public static final Option<Boolean> ARRAY_MODE =
+ Options.key("array_mode")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Send data as a JSON array when true, or as a
single JSON object when false (default)");
+
+ public static final Option<Integer> BATCH_SIZE =
+ Options.key("batch_size")
+ .intType()
+ .defaultValue(1)
+ .withDescription(
+ "The batch size of records to send in one HTTP
request. Only works when array_mode is true");
+
+ public static final Option<Integer> REQUEST_INTERVAL_MS =
+ Options.key("request_interval_ms")
+ .intType()
+ .defaultValue(0)
+ .withDescription("The interval milliseconds between two
HTTP requests");
+
+ public static final Option<String> FORMAT =
Review Comment:
[nitpick] The description for the FORMAT option indicates that only 'json'
is supported; consider clarifying the documentation or renaming the option to
avoid confusion regarding supported formats.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]