ocean-zhc commented on code in PR #9292:
URL: https://github.com/apache/seatunnel/pull/9292#discussion_r2080761686
##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java:
##########
@@ -48,18 +58,107 @@ public HttpSinkWriter(
SeaTunnelRowType seaTunnelRowType,
HttpParameter httpParameter,
SerializationSchema serializationSchema) {
+ this(seaTunnelRowType, httpParameter, serializationSchema, false, 1,
0, "json");
+ }
+
+ public HttpSinkWriter(
+ SeaTunnelRowType seaTunnelRowType,
+ HttpParameter httpParameter,
+ boolean arrayMode,
+ int batchSize,
+ int requestIntervalMs,
+ String format) {
+ this(
+ seaTunnelRowType,
+ httpParameter,
+ new JsonSerializationSchema(seaTunnelRowType),
+ arrayMode,
+ batchSize,
+ requestIntervalMs,
+ format);
+ }
+
+ public HttpSinkWriter(
+ SeaTunnelRowType seaTunnelRowType,
+ HttpParameter httpParameter,
+ SerializationSchema serializationSchema,
+ boolean arrayMode,
+ int batchSize,
+ int requestIntervalMs,
+ String format) {
this.seaTunnelRowType = seaTunnelRowType;
this.httpParameter = httpParameter;
- this.httpClient = new HttpClientProvider(httpParameter);
+ this.httpClient = createHttpClient(httpParameter);
this.serializationSchema = serializationSchema;
+ this.arrayMode = arrayMode;
+ this.batchSize = batchSize;
+ this.requestIntervalMs = requestIntervalMs;
+ this.format = format;
+ this.batchBuffer = new ArrayList<>(batchSize);
+ this.lastRequestTime = System.currentTimeMillis();
}
@Override
public void write(SeaTunnelRow element) throws IOException {
+ if (!arrayMode) {
+ // Object mode: send each record individually, ignore batch_size
setting
+ writeSingleRecord(element);
+ } else {
+ // Array mode: batch processing
+ batchBuffer.add(element);
+ if (batchBuffer.size() >= batchSize) {
+ flush();
+ }
+ }
+ }
+
+ private void writeSingleRecord(SeaTunnelRow element) throws IOException {
byte[] serialize = serializationSchema.serialize(element);
String body = new String(serialize);
+ doHttpRequest(body);
+ }
+
+ private void flush() throws IOException {
+ if (batchBuffer.isEmpty()) {
+ return;
+ }
+
+ // Check request interval
+ long currentTime = System.currentTimeMillis();
+ long timeSinceLastRequest = currentTime - lastRequestTime;
+ if (requestIntervalMs > 0 && timeSinceLastRequest < requestIntervalMs)
{
+ try {
+ Thread.sleep(requestIntervalMs - timeSinceLastRequest);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Sleep interrupted", e);
+ }
+ }
+
+ // Array mode: serialize batch data
+ if ("json".equalsIgnoreCase(format)) {
+ // Constructing JSON arrays
+ List<String> jsonRecords = new ArrayList<>(batchBuffer.size());
+ for (SeaTunnelRow row : batchBuffer) {
+ byte[] serialize = serializationSchema.serialize(row);
+ jsonRecords.add(new String(serialize));
+ }
+ String body = "[" + String.join(",", jsonRecords) + "]";
Review Comment:
The current way of manually constructing JSON arrays in the code does have
some potential problems, I have constructed json arrays using Jackson
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]