Copilot commented on code in PR #9292:
URL: https://github.com/apache/seatunnel/pull/9292#discussion_r2080746270


##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java:
##########
@@ -48,18 +58,107 @@ public HttpSinkWriter(
             SeaTunnelRowType seaTunnelRowType,
             HttpParameter httpParameter,
             SerializationSchema serializationSchema) {
+        this(seaTunnelRowType, httpParameter, serializationSchema, false, 1, 
0, "json");
+    }
+
+    public HttpSinkWriter(
+            SeaTunnelRowType seaTunnelRowType,
+            HttpParameter httpParameter,
+            boolean arrayMode,
+            int batchSize,
+            int requestIntervalMs,
+            String format) {
+        this(
+                seaTunnelRowType,
+                httpParameter,
+                new JsonSerializationSchema(seaTunnelRowType),
+                arrayMode,
+                batchSize,
+                requestIntervalMs,
+                format);
+    }
+
+    public HttpSinkWriter(
+            SeaTunnelRowType seaTunnelRowType,
+            HttpParameter httpParameter,
+            SerializationSchema serializationSchema,
+            boolean arrayMode,
+            int batchSize,
+            int requestIntervalMs,
+            String format) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.httpParameter = httpParameter;
-        this.httpClient = new HttpClientProvider(httpParameter);
+        this.httpClient = createHttpClient(httpParameter);
         this.serializationSchema = serializationSchema;
+        this.arrayMode = arrayMode;
+        this.batchSize = batchSize;
+        this.requestIntervalMs = requestIntervalMs;
+        this.format = format;
+        this.batchBuffer = new ArrayList<>(batchSize);
+        this.lastRequestTime = System.currentTimeMillis();
     }
 
     @Override
     public void write(SeaTunnelRow element) throws IOException {
+        if (!arrayMode) {
+            // Object mode: send each record individually, ignore batch_size 
setting
+            writeSingleRecord(element);
+        } else {
+            // Array mode: batch processing
+            batchBuffer.add(element);
+            if (batchBuffer.size() >= batchSize) {
+                flush();
+            }
+        }
+    }
+
+    private void writeSingleRecord(SeaTunnelRow element) throws IOException {
         byte[] serialize = serializationSchema.serialize(element);
         String body = new String(serialize);
+        doHttpRequest(body);
+    }
+
+    private void flush() throws IOException {
+        if (batchBuffer.isEmpty()) {
+            return;
+        }
+
+        // Check request interval
+        long currentTime = System.currentTimeMillis();
+        long timeSinceLastRequest = currentTime - lastRequestTime;
+        if (requestIntervalMs > 0 && timeSinceLastRequest < requestIntervalMs) 
{
+            try {
+                Thread.sleep(requestIntervalMs - timeSinceLastRequest);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.warn("Sleep interrupted", e);
+            }
+        }
+
+        // Array mode: serialize batch data
+        if ("json".equalsIgnoreCase(format)) {
+            // Constructing JSON arrays
+            List<String> jsonRecords = new ArrayList<>(batchBuffer.size());
+            for (SeaTunnelRow row : batchBuffer) {
+                byte[] serialize = serializationSchema.serialize(row);
+                jsonRecords.add(new String(serialize));
+            }
+            String body = "[" + String.join(",", jsonRecords) + "]";

Review Comment:
   [nitpick] Consider using a dedicated JSON library to construct the JSON 
array, which can help ensure proper formatting and handle edge cases more 
robustly.



##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSinkOptions.java:
##########
@@ -17,4 +17,33 @@
 
 package org.apache.seatunnel.connectors.seatunnel.http.config;
 
-public class HttpSinkOptions extends HttpCommonOptions {}
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class HttpSinkOptions extends HttpCommonOptions {
+    public static final Option<Boolean> ARRAY_MODE =
+            Options.key("array_mode")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Send data as a JSON array when true, or as a 
single JSON object when false (default)");
+
+    public static final Option<Integer> BATCH_SIZE =
+            Options.key("batch_size")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "The batch size of records to send in one HTTP 
request. Only works when array_mode is true");
+
+    public static final Option<Integer> REQUEST_INTERVAL_MS =
+            Options.key("request_interval_ms")
+                    .intType()
+                    .defaultValue(0)
+                    .withDescription("The interval milliseconds between two 
HTTP requests");
+
+    public static final Option<String> FORMAT =

Review Comment:
   [nitpick] The description for the FORMAT option indicates that only 'json' 
is supported; consider clarifying the documentation or renaming the option to 
avoid confusion regarding supported formats.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to