ocean-zhc commented on code in PR #9292:
URL: https://github.com/apache/seatunnel/pull/9292#discussion_r2094793706


##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java:
##########
@@ -48,18 +60,96 @@ public HttpSinkWriter(
             SeaTunnelRowType seaTunnelRowType,
             HttpParameter httpParameter,
             SerializationSchema serializationSchema) {
+        this(seaTunnelRowType, httpParameter, serializationSchema, false, 1, 
0);
+    }
+
+    public HttpSinkWriter(
+            SeaTunnelRowType seaTunnelRowType,
+            HttpParameter httpParameter,
+            boolean arrayMode,
+            int batchSize,
+            int requestIntervalMs) {
+        this(
+                seaTunnelRowType,
+                httpParameter,
+                new JsonSerializationSchema(seaTunnelRowType),
+                arrayMode,
+                batchSize,
+                requestIntervalMs);
+    }
+
+    public HttpSinkWriter(
+            SeaTunnelRowType seaTunnelRowType,
+            HttpParameter httpParameter,
+            SerializationSchema serializationSchema,
+            boolean arrayMode,
+            int batchSize,
+            int requestIntervalMs) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.httpParameter = httpParameter;
-        this.httpClient = new HttpClientProvider(httpParameter);
+        this.httpClient = createHttpClient(httpParameter);
         this.serializationSchema = serializationSchema;
+        this.arrayMode = arrayMode;
+        this.batchSize = batchSize;
+        this.requestIntervalMs = requestIntervalMs;
+        this.batchBuffer = new ArrayList<>(batchSize);
+        this.lastRequestTime = System.currentTimeMillis();
     }
 
     @Override
     public void write(SeaTunnelRow element) throws IOException {
+        if (!arrayMode) {
+            // Object mode: send each record individually, ignore batch_size 
setting
+            writeSingleRecord(element);
+        } else {
+            // Array mode: batch processing
+            batchBuffer.add(element);
+            if (batchBuffer.size() >= batchSize) {
+                flush();
+            }
+        }
+    }
+
+    private void writeSingleRecord(SeaTunnelRow element) throws IOException {
         byte[] serialize = serializationSchema.serialize(element);
         String body = new String(serialize);
+        doHttpRequest(body);
+    }
+
+    private void flush() throws IOException {
+        if (batchBuffer.isEmpty()) {
+            return;
+        }
+
+        // Check request interval
+        long currentTime = System.currentTimeMillis();
+        long timeSinceLastRequest = currentTime - lastRequestTime;
+        if (requestIntervalMs > 0 && timeSinceLastRequest < requestIntervalMs) 
{
+            try {
+                Thread.sleep(requestIntervalMs - timeSinceLastRequest);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.warn("Sleep interrupted", e);
+            }
+        }
+
+        // Array mode: serialize batch data as JSON
+        ObjectMapper mapper = new ObjectMapper();
+        ArrayNode arrayNode = mapper.createArrayNode();
+        for (SeaTunnelRow row : batchBuffer) {
+            byte[] serialize = serializationSchema.serialize(row);
+            arrayNode.add(new String(serialize));
+        }
+        String body = mapper.writeValueAsString(arrayNode);
+        doHttpRequest(body);
+
+        batchBuffer.clear();
+        lastRequestTime = System.currentTimeMillis();
+    }
+
+    private void doHttpRequest(String body) {
         try {
-            // only support post web hook
+            // Send HTTP request

Review Comment:
   +1
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to