This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 04ee8aca04 [Feature][http-Sink] Implementing http batch writes (#9292)
04ee8aca04 is described below

commit 04ee8aca04192c35cdcc548f46309ba9cd0f5bf9
Author: ocean-zhc <[email protected]>
AuthorDate: Tue May 20 09:38:11 2025 +0800

    [Feature][http-Sink] Implementing http batch writes (#9292)
---
 docs/en/connector-v2/sink/Http.md                  |  18 ++
 docs/zh/connector-v2/sink/Http.md                  |  18 ++
 .../seatunnel/http/config/HttpParameter.java       |   3 +
 .../seatunnel/http/config/HttpSinkOptions.java     |  25 ++-
 .../connectors/seatunnel/http/sink/HttpSink.java   |  10 +
 .../seatunnel/http/sink/HttpSinkFactory.java       |   3 +
 .../seatunnel/http/sink/HttpSinkWriter.java        | 101 ++++++++++-
 .../http/sink/HttpSinkBatchWriterTest.java         | 201 +++++++++++++++++++++
 8 files changed, 376 insertions(+), 3 deletions(-)

diff --git a/docs/en/connector-v2/sink/Http.md 
b/docs/en/connector-v2/sink/Http.md
index 6cd7f7a884..658102d7f5 100644
--- a/docs/en/connector-v2/sink/Http.md
+++ b/docs/en/connector-v2/sink/Http.md
@@ -44,6 +44,9 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 | retry_backoff_max_ms        | Int    | No       | 10000   | The maximum 
retry-backoff times(millis) if request http failed                              
                |
 | connect_timeout_ms          | Int    | No       | 12000   | Connection 
timeout setting, default 12s.                                                   
                 |
 | socket_timeout_ms           | Int    | No       | 60000   | Socket timeout 
setting, default 60s.                                                           
             |
+| array_mode                  | Boolean| No       | false   | Send data as a 
JSON array when true, or as a single JSON object when false (default)           
             |
+| batch_size                  | Int    | No       | 1       | The batch size 
of records to send in one HTTP request. Only works when array_mode is true.     
             |
+| request_interval_ms         | Int    | No       | 0       | The interval 
milliseconds between two HTTP requests, to avoid sending requests too 
frequently.              |
 | common-options              |        | No       | -       | Sink plugin 
common parameters, please refer to [Sink Common 
Options](../sink-common-options.md) for details |
 
 ## Example
@@ -59,6 +62,21 @@ Http {
 }
 ```
 
+### With Batch Processing
+
+```hocon
+Http {
+    url = "http://localhost/test/webhook";
+    headers {
+        token = "9e32e859ef044462a257e1fc76730066"
+        Content-Type = "application/json"
+    }
+    array_mode = true
+    batch_size = 50
+    request_interval_ms = 500
+}
+```
+
 ### Multiple table
 
 #### example1
diff --git a/docs/zh/connector-v2/sink/Http.md 
b/docs/zh/connector-v2/sink/Http.md
index 1e9e970c55..b7d850639b 100644
--- a/docs/zh/connector-v2/sink/Http.md
+++ b/docs/zh/connector-v2/sink/Http.md
@@ -42,6 +42,9 @@ import ChangeLog from '../changelog/connector-http.md';
 | retry_backoff_max_ms        | Int    | 否    | 10000 | http请求失败,最大重试回退时间(毫秒)  
                                    |
 | connect_timeout_ms          | Int    | 否    | 12000 | 连接超时设置,默认12s           
                                    |
 | socket_timeout_ms           | Int    | 否    | 60000 | 套接字超时设置,默认为60s         
                                    |
+| array_mode                  | Boolean| 否    | false | 
为true时将数据作为JSON数组发送,为false时作为单个JSON对象发送(默认)                |
+| batch_size                  | Int    | 否    | 1     | 
在一个HTTP请求中发送的记录批量大小。仅在array_mode为true时有效                   |
+| request_interval_ms         | Int    | 否    | 0     | 
两次HTTP请求之间的间隔毫秒数,以避免请求过于频繁                                 |
 | common-options              |        | 否    | -     | Sink插件常用参数,请参考 
[Sink常用选项 ](../sink-common-options.md) 了解详情 |
 
 ## 示例
@@ -57,6 +60,21 @@ Http {
 }
 ```
 
+### 带批处理的示例
+
+```hocon
+Http {
+    url = "http://localhost/test/webhook";
+    headers {
+        token = "9e32e859ef044462a257e1fc76730066"
+        Content-Type = "application/json"
+    }
+    array_mode = true
+    batch_size = 50
+    request_interval_ms = 500
+}
+```
+
 ## 变更日志
 
 <ChangeLog />
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
index 9e4fcadb6d..272f202329 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
@@ -42,6 +42,9 @@ public class HttpParameter implements Serializable {
     protected boolean enableMultilines;
     protected int connectTimeoutMs;
     protected int socketTimeoutMs;
+    protected boolean arrayMode = false;
+    protected int batchSize = 1;
+    protected int requestIntervalMs = 0;
 
     public void buildWithConfig(ReadonlyConfig pluginConfig) {
         // set url
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSinkOptions.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSinkOptions.java
index 58328a9e98..1b4677efc2 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSinkOptions.java
@@ -17,4 +17,27 @@
 
 package org.apache.seatunnel.connectors.seatunnel.http.config;
 
-public class HttpSinkOptions extends HttpCommonOptions {}
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class HttpSinkOptions extends HttpCommonOptions {
+    public static final Option<Boolean> ARRAY_MODE =
+            Options.key("array_mode")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Send data as a JSON array when true, or as a 
single JSON object when false (default)");
+
+    public static final Option<Integer> BATCH_SIZE =
+            Options.key("batch_size")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "The batch size of records to send in one HTTP 
request. Only works when array_mode is true");
+
+    public static final Option<Integer> REQUEST_INTERVAL_MS =
+            Options.key("request_interval_ms")
+                    .intType()
+                    .defaultValue(0)
+                    .withDescription("The interval milliseconds between two 
HTTP requests");
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
index 7274a464bf..c19c2b18c7 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java
@@ -47,6 +47,16 @@ public class HttpSink extends 
AbstractSimpleSink<SeaTunnelRow, Void>
         if (pluginConfig.getOptional(HttpSinkOptions.PARAMS).isPresent()) {
             httpParameter.setHeaders(pluginConfig.get(HttpSinkOptions.PARAMS));
         }
+        if (pluginConfig.getOptional(HttpSinkOptions.ARRAY_MODE).isPresent()) {
+            
httpParameter.setArrayMode(pluginConfig.get(HttpSinkOptions.ARRAY_MODE));
+        }
+        if (pluginConfig.getOptional(HttpSinkOptions.BATCH_SIZE).isPresent()) {
+            
httpParameter.setBatchSize(pluginConfig.get(HttpSinkOptions.BATCH_SIZE));
+        }
+        if 
(pluginConfig.getOptional(HttpSinkOptions.REQUEST_INTERVAL_MS).isPresent()) {
+            httpParameter.setRequestIntervalMs(
+                    pluginConfig.get(HttpSinkOptions.REQUEST_INTERVAL_MS));
+        }
         this.catalogTable = catalogTable;
         this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
     }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
index 0f057e9f04..b0a192893b 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java
@@ -48,6 +48,9 @@ public class HttpSinkFactory implements TableSinkFactory {
                 .optional(HttpSinkOptions.RETRY)
                 .optional(HttpSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS)
                 .optional(HttpSinkOptions.RETRY_BACKOFF_MAX_MS)
+                .optional(HttpSinkOptions.ARRAY_MODE)
+                .optional(HttpSinkOptions.BATCH_SIZE)
+                .optional(HttpSinkOptions.REQUEST_INTERVAL_MS)
                 .optional(SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
index 0333b8f37a..cde7eabe36 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
@@ -17,6 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.http.sink;
 
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
 import org.apache.seatunnel.api.serialization.SerializationSchema;
 import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -30,7 +34,10 @@ import 
org.apache.seatunnel.format.json.JsonSerializationSchema;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 
 @Slf4j
 public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
@@ -40,6 +47,13 @@ public class HttpSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
     protected final HttpParameter httpParameter;
     protected final SerializationSchema serializationSchema;
 
+    // Batch related fields
+    private final boolean arrayMode;
+    private final int batchSize;
+    private final int requestIntervalMs;
+    private final List<SeaTunnelRow> batchBuffer;
+    private long lastRequestTime;
+
     public HttpSinkWriter(SeaTunnelRowType seaTunnelRowType, HttpParameter 
httpParameter) {
         this(seaTunnelRowType, httpParameter, new 
JsonSerializationSchema(seaTunnelRowType));
     }
@@ -48,18 +62,81 @@ public class HttpSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
             SeaTunnelRowType seaTunnelRowType,
             HttpParameter httpParameter,
             SerializationSchema serializationSchema) {
+        this(
+                seaTunnelRowType,
+                httpParameter,
+                serializationSchema,
+                httpParameter.isArrayMode(),
+                httpParameter.getBatchSize(),
+                httpParameter.getRequestIntervalMs());
+    }
+
+    public HttpSinkWriter(
+            SeaTunnelRowType seaTunnelRowType,
+            HttpParameter httpParameter,
+            SerializationSchema serializationSchema,
+            boolean arrayMode,
+            int batchSize,
+            int requestIntervalMs) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.httpParameter = httpParameter;
-        this.httpClient = new HttpClientProvider(httpParameter);
+        this.httpClient = createHttpClient(httpParameter);
         this.serializationSchema = serializationSchema;
+        this.arrayMode = arrayMode;
+        this.batchSize = batchSize;
+        this.requestIntervalMs = requestIntervalMs;
+        this.batchBuffer = new ArrayList<>(batchSize);
+        this.lastRequestTime = System.currentTimeMillis();
     }
 
     @Override
     public void write(SeaTunnelRow element) throws IOException {
+        if (!arrayMode) {
+            writeSingleRecord(element);
+        } else {
+            batchBuffer.add(element);
+            if (batchBuffer.size() >= batchSize) {
+                flush();
+            }
+        }
+    }
+
+    private void writeSingleRecord(SeaTunnelRow element) throws IOException {
         byte[] serialize = serializationSchema.serialize(element);
         String body = new String(serialize);
+        doHttpRequest(body);
+    }
+
+    private void flush() throws IOException {
+        if (batchBuffer.isEmpty()) {
+            return;
+        }
+        long currentTime = System.currentTimeMillis();
+        long timeSinceLastRequest = currentTime - lastRequestTime;
+        if (requestIntervalMs > 0 && timeSinceLastRequest < requestIntervalMs) 
{
+            try {
+                Thread.sleep(requestIntervalMs - timeSinceLastRequest);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        // Array mode: serialize batch data as JSON
+        ObjectMapper mapper = new ObjectMapper();
+        ArrayNode arrayNode = mapper.createArrayNode();
+        for (SeaTunnelRow row : batchBuffer) {
+            byte[] serialize = serializationSchema.serialize(row);
+            arrayNode.add(new String(serialize));
+        }
+        String body = mapper.writeValueAsString(arrayNode);
+        doHttpRequest(body);
+
+        batchBuffer.clear();
+        lastRequestTime = System.currentTimeMillis();
+    }
+
+    private void doHttpRequest(String body) {
         try {
-            // only support post web hook
             HttpResponse response =
                     httpClient.doPost(httpParameter.getUrl(), 
httpParameter.getHeaders(), body);
             if (HttpResponse.STATUS_OK == response.getCode()) {
@@ -76,8 +153,28 @@ public class HttpSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
 
     @Override
     public void close() throws IOException {
+        if (arrayMode) {
+            flush();
+        }
         if (Objects.nonNull(httpClient)) {
             httpClient.close();
         }
     }
+
+    @Override
+    public Optional<Void> prepareCommit() {
+        if (arrayMode) {
+            try {
+                flush();
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to flush data in 
prepareCommit", e);
+            }
+        }
+        return Optional.empty();
+    }
+
+    @VisibleForTesting
+    protected HttpClientProvider createHttpClient(HttpParameter httpParameter) 
{
+        return new HttpClientProvider(httpParameter);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkBatchWriterTest.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkBatchWriterTest.java
new file mode 100644
index 0000000000..ee8769f595
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkBatchWriterTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.sink;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class HttpSinkBatchWriterTest {
+
+    private static final String TEST_URL = "http://example.com/test";;
+    private static final int BATCH_SIZE = 3;
+    private static final int REQUEST_INTERVAL_MS = 0;
+
+    @Mock private HttpClientProvider httpClientProvider;
+
+    @Captor private ArgumentCaptor<String> requestBodyCaptor;
+
+    private HttpParameter httpParameter;
+    private SeaTunnelRowType rowType;
+    private TestableHttpSinkWriter sinkWriter;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        // Setting HTTP Parameters
+        httpParameter = new HttpParameter();
+        httpParameter.setUrl(TEST_URL);
+        Map<String, String> headers = new HashMap<>();
+        headers.put("Content-Type", "application/json");
+        httpParameter.setHeaders(headers);
+
+        // Simulate HTTP response
+        HttpResponse mockResponse = Mockito.mock(HttpResponse.class);
+        when(mockResponse.getCode()).thenReturn(HttpResponse.STATUS_OK);
+        when(httpClientProvider.doPost(anyString(), any(), 
anyString())).thenReturn(mockResponse);
+
+        // Creating Row Types
+        String[] fieldNames = new String[] {"id", "name", "age"};
+        SeaTunnelDataType<?>[] dataTypes =
+                new SeaTunnelDataType<?>[] {
+                    BasicType.INT_TYPE, BasicType.STRING_TYPE, 
BasicType.INT_TYPE
+                };
+        rowType = new SeaTunnelRowType(fieldNames, dataTypes);
+    }
+
+    @Test
+    public void testDefaultParameterValues() throws Exception {
+        // No parameters are set, use default values
+        // default:arrayMode = false, batchSize = 1, requestIntervalMs = 0
+        HttpParameter defaultHttpParameter = new HttpParameter();
+        defaultHttpParameter.setUrl(TEST_URL);
+        Map<String, String> headers = new HashMap<>();
+        headers.put("Content-Type", "application/json");
+        defaultHttpParameter.setHeaders(headers);
+
+        // Verify the default parameter value
+        assertFalse(defaultHttpParameter.isArrayMode());
+        assertEquals(1, defaultHttpParameter.getBatchSize());
+        assertEquals(0, defaultHttpParameter.getRequestIntervalMs());
+
+        sinkWriter = new TestableHttpSinkWriter(rowType, defaultHttpParameter);
+
+        // Write 3 records
+        for (int i = 0; i < 3; i++) {
+            SeaTunnelRow row = createTestRow(i + 1, "user" + (i + 1), 20 + i);
+            sinkWriter.write(row);
+        }
+
+        // In the default object mode, there should be 3 HTTP requests, each 
record is sent
+        // separately
+        verify(httpClientProvider, times(3))
+                .doPost(eq(TEST_URL), any(), requestBodyCaptor.capture());
+
+        // Verify request format (single object)
+        for (String requestBody : requestBodyCaptor.getAllValues()) {
+            assertTrue(requestBody.startsWith("{"));
+            assertTrue(requestBody.endsWith("}"));
+        }
+    }
+
+    @Test
+    public void testObjectModeIgnoresBatchSize() throws Exception {
+        // Use object mode (default) to ignore batch size
+        httpParameter.setArrayMode(false);
+        httpParameter.setBatchSize(BATCH_SIZE);
+        httpParameter.setRequestIntervalMs(REQUEST_INTERVAL_MS);
+        sinkWriter = new TestableHttpSinkWriter(rowType, httpParameter);
+
+        // Write 3 records (equal to batch size)
+        for (int i = 0; i < BATCH_SIZE; i++) {
+            SeaTunnelRow row = createTestRow(i + 1, "user" + (i + 1), 20 + i);
+            sinkWriter.write(row);
+        }
+
+        // In object mode, there should be 3 HTTP requests, each record sent 
separately
+        verify(httpClientProvider, times(3))
+                .doPost(eq(TEST_URL), any(), requestBodyCaptor.capture());
+
+        // Validation request format (single object)
+        for (String requestBody : requestBodyCaptor.getAllValues()) {
+            assertTrue(requestBody.startsWith("{"));
+            assertTrue(requestBody.endsWith("}"));
+        }
+    }
+
+    @Test
+    public void testArrayModeWithBatch() throws Exception {
+        // Use array mode to turn on batch processing
+        httpParameter.setArrayMode(true);
+        httpParameter.setBatchSize(BATCH_SIZE);
+        httpParameter.setRequestIntervalMs(REQUEST_INTERVAL_MS);
+        sinkWriter = new TestableHttpSinkWriter(rowType, httpParameter);
+
+        // Write 5 records (over batch size)
+        for (int i = 0; i < 5; i++) {
+            SeaTunnelRow row = createTestRow(i + 1, "user" + (i + 1), 20 + i);
+            sinkWriter.write(row);
+        }
+
+        // There should only be 1 HTTP request (the first batch of 3), the 
remaining 2 have not yet
+        // met the batch size
+        verify(httpClientProvider, times(1))
+                .doPost(eq(TEST_URL), any(), requestBodyCaptor.capture());
+
+        // Validation request format (array)
+        String requestBody = requestBodyCaptor.getValue();
+        assertTrue(requestBody.startsWith("["));
+        assertTrue(requestBody.endsWith("]"));
+
+        // Close SinkWriter, should send another request (for the remaining 2 
records)
+        sinkWriter.close();
+        verify(httpClientProvider, times(2))
+                .doPost(eq(TEST_URL), any(), requestBodyCaptor.capture());
+
+        // Validating the content of the second request
+        requestBody = requestBodyCaptor.getValue();
+        assertTrue(requestBody.startsWith("["));
+        assertTrue(requestBody.endsWith("]"));
+    }
+
+    private SeaTunnelRow createTestRow(int id, String name, int age) {
+        return new SeaTunnelRow(new Object[] {id, name, age});
+    }
+
+    private class TestableHttpSinkWriter extends HttpSinkWriter {
+        public TestableHttpSinkWriter(
+                SeaTunnelRowType seaTunnelRowType, HttpParameter 
httpParameter) {
+            super(seaTunnelRowType, httpParameter);
+        }
+
+        @Override
+        protected HttpClientProvider createHttpClient(HttpParameter 
httpParameter) {
+            return httpClientProvider;
+        }
+    }
+}

Reply via email to