ocean-zhc commented on code in PR #9292:
URL: https://github.com/apache/seatunnel/pull/9292#discussion_r2094795759
##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java:
##########
@@ -48,18 +60,96 @@ public HttpSinkWriter(
SeaTunnelRowType seaTunnelRowType,
HttpParameter httpParameter,
SerializationSchema serializationSchema) {
+ this(seaTunnelRowType, httpParameter, serializationSchema, false, 1,
0);
+ }
+
+ public HttpSinkWriter(
+ SeaTunnelRowType seaTunnelRowType,
+ HttpParameter httpParameter,
+ boolean arrayMode,
+ int batchSize,
+ int requestIntervalMs) {
+ this(
+ seaTunnelRowType,
+ httpParameter,
+ new JsonSerializationSchema(seaTunnelRowType),
+ arrayMode,
+ batchSize,
+ requestIntervalMs);
+ }
+
+ public HttpSinkWriter(
+ SeaTunnelRowType seaTunnelRowType,
+ HttpParameter httpParameter,
+ SerializationSchema serializationSchema,
+ boolean arrayMode,
+ int batchSize,
+ int requestIntervalMs) {
this.seaTunnelRowType = seaTunnelRowType;
this.httpParameter = httpParameter;
- this.httpClient = new HttpClientProvider(httpParameter);
+ this.httpClient = createHttpClient(httpParameter);
this.serializationSchema = serializationSchema;
+ this.arrayMode = arrayMode;
+ this.batchSize = batchSize;
+ this.requestIntervalMs = requestIntervalMs;
+ this.batchBuffer = new ArrayList<>(batchSize);
+ this.lastRequestTime = System.currentTimeMillis();
}
@Override
public void write(SeaTunnelRow element) throws IOException {
+ if (!arrayMode) {
+ // Object mode: send each record individually, ignore batch_size
setting
+ writeSingleRecord(element);
+ } else {
+ // Array mode: batch processing
+ batchBuffer.add(element);
+ if (batchBuffer.size() >= batchSize) {
+ flush();
+ }
+ }
+ }
+
+ private void writeSingleRecord(SeaTunnelRow element) throws IOException {
byte[] serialize = serializationSchema.serialize(element);
String body = new String(serialize);
+ doHttpRequest(body);
+ }
+
+ private void flush() throws IOException {
+ if (batchBuffer.isEmpty()) {
+ return;
+ }
+
+ // Check request interval
+ long currentTime = System.currentTimeMillis();
+ long timeSinceLastRequest = currentTime - lastRequestTime;
+ if (requestIntervalMs > 0 && timeSinceLastRequest < requestIntervalMs)
{
+ try {
+ Thread.sleep(requestIntervalMs - timeSinceLastRequest);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Sleep interrupted", e);
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]