Copilot commented on code in PR #61626:
URL: https://github.com/apache/doris/pull/61626#discussion_r2973885313


##########
sdk/java-doris-sdk/src/main/resources/log4j.properties:
##########
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, STDOUT
+
+log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
+log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
+log4j.appender.STDOUT.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} 
[%t] %-5p %c - %m%n

Review Comment:
   Shipping a `log4j.properties` in `src/main/resources` means the SDK jar can 
override an application's logging configuration at runtime. Libraries typically 
should not include logging config; move this to `src/test/resources` or the 
examples module, and let applications choose/configure their own logging 
backend.
   ```suggestion
   
################################################################################
   ```



##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/DorisLoadClient.java:
##########
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load;
+
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.exception.StreamLoadException;
+import org.apache.doris.sdk.load.internal.RequestBuilder;
+import org.apache.doris.sdk.load.internal.StreamLoader;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Thread-safe Doris stream load client.
+ *
+ * <p>Usage:
+ * <pre>
+ * DorisLoadClient client = new DorisLoadClient(config);
+ * LoadResponse resp = client.load(new ByteArrayInputStream(data));
+ * if (resp.getStatus() == LoadResponse.Status.SUCCESS) { ... }
+ * </pre>
+ *
+ * <p>Thread safety: this instance can be shared across threads.
+ * Each {@link #load} call must receive an independent InputStream.
+ */
+public class DorisLoadClient implements AutoCloseable {
+
+    private static final Logger log = 
LoggerFactory.getLogger(DorisLoadClient.class);
+    /** Absolute maximum for a single backoff interval: 5 minutes. */
+    private static final long ABSOLUTE_MAX_INTERVAL_MS = 300_000L;
+
+    private final DorisConfig config;
+    private final StreamLoader streamLoader;
+
+    public DorisLoadClient(DorisConfig config) {
+        this.config = config;
+        this.streamLoader = new StreamLoader();
+    }
+
+    /** Package-private constructor for testing with a mock StreamLoader. */
+    DorisLoadClient(DorisConfig config, StreamLoader streamLoader) {
+        this.config = config;
+        this.streamLoader = streamLoader;
+    }
+
+    /**
+     * Loads data from the given InputStream into Doris via stream load.
+     * The InputStream is fully consumed and buffered before the first attempt.
+     * Retries with exponential backoff on retryable errors (network/HTTP 
failures).
+     * Business failures (bad data, schema mismatch, auth) are returned 
immediately without retry.
+     *
+     * @param inputStream data to load (consumed once; must not be shared 
across threads)
+     * @return LoadResponse with status SUCCESS or FAILURE
+     * @throws IOException if the stream cannot be read or all retries are 
exhausted
+     */
+    public LoadResponse load(InputStream inputStream) throws IOException {
+        int maxRetries = 6;
+        long baseIntervalMs = 1000L;
+        long maxTotalTimeMs = 60000L;
+
+        if (config.getRetry() != null) {
+            maxRetries = config.getRetry().getMaxRetryTimes();
+            baseIntervalMs = config.getRetry().getBaseIntervalMs();
+            maxTotalTimeMs = config.getRetry().getMaxTotalTimeMs();
+        }
+
+        log.info("Starting stream load: {}.{}", config.getDatabase(), 
config.getTable());
+
+        // Buffer the InputStream once so retries can replay the body
+        byte[] bodyData = readAll(inputStream);
+

Review Comment:
   `load(InputStream)` fully consumes the provided stream but does not close 
it, and the Javadoc doesn’t state who owns/should close the stream. For 
non-memory streams (e.g., `FileInputStream`) this can leak resources. Either 
document that the caller must close the stream, or wrap the read in a 
try-with-resources and take ownership explicitly.



##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/DataGenerator.java:
##########
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.examples;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Random;
+
+/**
+ * Unified data generation utilities for all Doris Stream Load examples.
+ * All examples use a unified Order schema for consistency.
+ *
+ * <p>Table DDL (orders):
+ * <pre>
+ * CREATE TABLE orders (
+ *   order_id      BIGINT,
+ *   customer_id   BIGINT,
+ *   product_name  VARCHAR(200),
+ *   category      VARCHAR(50),
+ *   brand         VARCHAR(50),
+ *   quantity      INT,
+ *   unit_price    DECIMAL(10,2),
+ *   total_amount  DECIMAL(10,2),
+ *   status        VARCHAR(20),
+ *   order_date    DATETIME,
+ *   region        VARCHAR(20)
+ * ) DISTRIBUTED BY HASH(order_id) BUCKETS 10
+ * PROPERTIES (
+ * "replication_allocation" = "tag.location.default: 1",

Review Comment:
   The DDL snippet in the Javadoc has a trailing comma after the only property 
entry, which makes the SQL invalid as written. Please remove the extra comma 
(or add another property) so users can copy/paste the example DDL successfully.
   ```suggestion
    * "replication_allocation" = "tag.location.default: 1"
   ```



##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/DataGenerator.java:
##########
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.examples;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Random;
+
+/**
+ * Unified data generation utilities for all Doris Stream Load examples.
+ * All examples use a unified Order schema for consistency.
+ *
+ * <p>Table DDL (orders):
+ * <pre>
+ * CREATE TABLE orders (
+ *   order_id      BIGINT,
+ *   customer_id   BIGINT,
+ *   product_name  VARCHAR(200),
+ *   category      VARCHAR(50),
+ *   brand         VARCHAR(50),
+ *   quantity      INT,
+ *   unit_price    DECIMAL(10,2),
+ *   total_amount  DECIMAL(10,2),
+ *   status        VARCHAR(20),
+ *   order_date    DATETIME,
+ *   region        VARCHAR(20)
+ * ) DISTRIBUTED BY HASH(order_id) BUCKETS 10
+ * PROPERTIES (
+ * "replication_allocation" = "tag.location.default: 1",
+ * )
+ * </pre>
+ */
+public class DataGenerator {
+
+    private static final String[] CATEGORIES = {
+            "Electronics", "Clothing", "Books", "Home", "Sports",
+            "Beauty", "Automotive", "Food", "Health", "Toys"
+    };
+
+    private static final String[] BRANDS = {
+            "Apple", "Samsung", "Nike", "Adidas", "Sony",
+            "LG", "Canon", "Dell", "HP", "Xiaomi", "Huawei", "Lenovo"
+    };
+
+    private static final String[] STATUSES = {
+            "active", "inactive", "pending", "discontinued", "completed", 
"cancelled"
+    };
+
+    private static final String[] REGIONS = {"North", "South", "East", "West", 
"Central"};
+
+    /**
+     * Generates order data in CSV format.
+     *
+     * @param workerID  worker ID (0 for single-threaded), used to offset 
order IDs
+     * @param batchSize number of records to generate
+     * @return CSV string (no header row)
+     */
+    public static String generateOrderCSV(int workerID, int batchSize) {
+        long seed = System.nanoTime() + (long) workerID * 1000;
+        Random rng = new Random(seed);
+        int baseOrderID = workerID * batchSize;
+
+        StringBuilder sb = new StringBuilder(batchSize * 200);
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+        for (int i = 1; i <= batchSize; i++) {
+            int quantity = rng.nextInt(10) + 1;
+            double unitPrice = (rng.nextInt(50000) + 1) / 100.0;
+            double totalAmount = quantity * unitPrice;
+            String productName = "Product_" + 
BRANDS[rng.nextInt(BRANDS.length)] + "_" + rng.nextInt(1000);
+            String category = CATEGORIES[rng.nextInt(CATEGORIES.length)];
+            String brand = BRANDS[rng.nextInt(BRANDS.length)];
+            String status = STATUSES[rng.nextInt(STATUSES.length)];
+            String region = REGIONS[rng.nextInt(REGIONS.length)];
+            long offsetMs = (long) rng.nextInt(365 * 24 * 3600) * 1000L;
+            String orderDate = sdf.format(new Date(System.currentTimeMillis() 
- offsetMs));
+
+            sb.append(baseOrderID + i).append(",")
+              .append(rng.nextInt(100000) + 1).append(",")
+              .append("\"").append(productName).append("\"").append(",")
+              .append(category).append(",")
+              .append(brand).append(",")
+              .append(quantity).append(",")
+              .append(String.format("%.2f", unitPrice)).append(",")
+              .append(String.format("%.2f", totalAmount)).append(",")
+              .append(status).append(",")
+              .append(orderDate).append(",")
+              .append(region).append("\n");
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Generates order data in JSON Lines format (one JSON object per line).
+     *
+     * @param workerID  worker ID (0 for single-threaded)
+     * @param batchSize number of records to generate
+     * @return JSON Lines string
+     */
+    public static String generateOrderJSON(int workerID, int batchSize) {
+        long seed = System.nanoTime() + (long) workerID * 1000;
+        Random rng = new Random(seed);
+        int baseOrderID = workerID * batchSize;
+
+        StringBuilder sb = new StringBuilder(batchSize * 300);
+        SimpleDateFormat sdf = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+
+        for (int i = 1; i <= batchSize; i++) {
+            int quantity = rng.nextInt(10) + 1;
+            double unitPrice = (rng.nextInt(50000) + 1) / 100.0;
+            double totalAmount = quantity * unitPrice;
+            String productName = "Product_" + 
BRANDS[rng.nextInt(BRANDS.length)] + "_" + rng.nextInt(1000);
+            String category = CATEGORIES[rng.nextInt(CATEGORIES.length)];
+            String brand = BRANDS[rng.nextInt(BRANDS.length)];
+            String status = STATUSES[rng.nextInt(STATUSES.length)];
+            String region = REGIONS[rng.nextInt(REGIONS.length)];
+            long offsetMs = (long) rng.nextInt(365 * 24 * 3600) * 1000L;
+            String orderDate = sdf.format(new Date(System.currentTimeMillis() 
- offsetMs));

Review Comment:
   `generateOrderJSON()` formats timestamps with a literal `'Z'` suffix but 
does not set the formatter timezone to UTC, so the produced strings look like 
UTC while actually representing local time. Either set the `SimpleDateFormat` 
timezone to UTC, or remove the `'Z'` suffix / use a local-time format to avoid 
misleading example data.



##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/StreamLoader.java:
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.exception.StreamLoadException;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.apache.doris.sdk.load.model.RespContent;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Uses RedirectStrategy so that 307 redirects on PUT requests are
+ * followed automatically (Doris FE redirects stream load to BE).
+ */
+public class StreamLoader implements AutoCloseable {
+
+    private static final Logger log = 
LoggerFactory.getLogger(StreamLoader.class);
+    private static final int SOCKET_TIMEOUT_MS = 9 * 60 * 1000;
+    private static final int CONNECT_TIMEOUT_MS = 60_000;
+
+    private final HttpClientBuilder httpClientBuilder;
+    private final ObjectMapper objectMapper;
+
+    public StreamLoader() {
+        this.httpClientBuilder = buildHttpClient();
+        this.objectMapper = new ObjectMapper();
+    }
+
+    /** Package-private constructor for testing with a mock HTTP client. */
+    StreamLoader(HttpClientBuilder httpClientBuilder) {
+        this.httpClientBuilder = httpClientBuilder;
+        this.objectMapper = new ObjectMapper();
+    }
+
+    /**
+     * Executes the HTTP PUT request and returns a LoadResponse.
+     *
+     * @throws StreamLoadException for retryable HTTP-level errors (non-200 
status, connection failure)
+     * @throws IOException         for unrecoverable I/O errors
+     */
+    public LoadResponse execute(HttpPut request) throws IOException {

Review Comment:
   The Javadoc says this method can throw `IOException` for unrecoverable I/O 
errors, but the implementation wraps all `IOException` into 
`StreamLoadException` (runtime). Either adjust the Javadoc/signature to reflect 
actual behavior, or allow non-retryable I/O errors (e.g., JSON parse) to 
propagate as `IOException`.
   ```suggestion
        * @throws StreamLoadException for HTTP-level errors (non-200 status, 
connection failure)
        *                              and unrecoverable I/O errors (including 
JSON parse failures)
        */
       public LoadResponse execute(HttpPut request) {
   ```



##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/StreamLoader.java:
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.exception.StreamLoadException;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.apache.doris.sdk.load.model.RespContent;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Uses RedirectStrategy so that 307 redirects on PUT requests are
+ * followed automatically (Doris FE redirects stream load to BE).
+ */
+public class StreamLoader implements AutoCloseable {
+
+    private static final Logger log = 
LoggerFactory.getLogger(StreamLoader.class);
+    private static final int SOCKET_TIMEOUT_MS = 9 * 60 * 1000;
+    private static final int CONNECT_TIMEOUT_MS = 60_000;
+
+    private final HttpClientBuilder httpClientBuilder;
+    private final ObjectMapper objectMapper;
+
+    public StreamLoader() {
+        this.httpClientBuilder = buildHttpClient();
+        this.objectMapper = new ObjectMapper();
+    }
+
+    /** Package-private constructor for testing with a mock HTTP client. */
+    StreamLoader(HttpClientBuilder httpClientBuilder) {
+        this.httpClientBuilder = httpClientBuilder;
+        this.objectMapper = new ObjectMapper();
+    }
+
+    /**
+     * Executes the HTTP PUT request and returns a LoadResponse.
+     *
+     * @throws StreamLoadException for retryable HTTP-level errors (non-200 
status, connection failure)
+     * @throws IOException         for unrecoverable I/O errors
+     */
+    public LoadResponse execute(HttpPut request) throws IOException {
+        log.debug("Sending HTTP PUT to {}", request.getURI());
+        long start = System.currentTimeMillis();
+
+        try (CloseableHttpClient httpClient = httpClientBuilder.build();
+                CloseableHttpResponse response = httpClient.execute(request)) {
+            int statusCode = response.getStatusLine().getStatusCode();

Review Comment:
   `execute()` builds and closes a new `CloseableHttpClient` for every request. 
This prevents connection pooling/keep-alive reuse and adds significant overhead 
under concurrency. Consider keeping a single `CloseableHttpClient` instance in 
`StreamLoader` (built once from the builder) and closing it in `close()`.



##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/RequestBuilder.java:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.config.GroupCommitMode;

Review Comment:
   `GroupCommitMode` is imported but never referenced by name (the `switch` 
cases resolve from the expression type). This creates an unused import warning 
and can fail builds that enable unused-import checks. Please remove the unused 
import.
   ```suggestion
   
   ```



##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/RequestBuilder.java:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.config.GroupCommitMode;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Builds HttpPut requests for Doris stream load.
+ * Handles header assembly, label generation, and group commit logic.
+ */
+public class RequestBuilder {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RequestBuilder.class);
+    private static final String STREAM_LOAD_PATTERN = 
"http://%s/api/%s/%s/_stream_load";;
+    private static final Random RANDOM = new Random();
+
+    /**
+     * Builds an HttpPut request for the given config and data.
+     *
+     * @param config  DorisConfig
+     * @param data    pre-buffered (and optionally pre-compressed) request 
body bytes
+     * @param attempt 0 = first attempt, >0 = retry number
+     */
+    public static HttpPut build(DorisConfig config, byte[] data, int attempt) 
throws Exception {
+        String host = pickEndpoint(config.getEndpoints());
+        String url = String.format(STREAM_LOAD_PATTERN, host, 
config.getDatabase(), config.getTable());
+

Review Comment:
   `STREAM_LOAD_PATTERN` hard-codes the `http://` scheme, but `pickEndpoint()` 
strips both `http://` and `https://`. If a user configures `https://...` 
endpoints, requests will still be sent over plain HTTP. Consider preserving the 
endpoint scheme (or parsing the endpoint as a URL and reusing its scheme) when 
building the stream load URL.



##########
sdk/java-doris-sdk/pom.xml:
##########
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.doris</groupId>
+    <artifactId>java-doris-sdk</artifactId>
+    <version>1.0.0</version>
+    <packaging>jar</packaging>
+
+    <name>Apache Doris Java SDK</name>
+    <description>A lightweight Apache Doris Stream Load client (Java 
version)</description>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <!-- HTTP client -->
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.13</version>
+        </dependency>
+
+        <!-- JSON serialization -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.13.5</version>
+        </dependency>
+
+        <!-- Logging facade -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.9</version>
+        </dependency>

Review Comment:
   This module depends on `slf4j-log4j12` (Log4j 1.x binding) as a main 
dependency, which (1) forces a logging backend on all SDK consumers and (2) 
pulls in the EOL Log4j 1.x stack with known security issues. Prefer depending 
on `slf4j-api` only, and add a backend (e.g., `slf4j-simple`/logback) only for 
tests/examples.



##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/DorisLoadClient.java:
##########
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load;
+
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.exception.StreamLoadException;
+import org.apache.doris.sdk.load.internal.RequestBuilder;
+import org.apache.doris.sdk.load.internal.StreamLoader;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Thread-safe Doris stream load client.
+ *
+ * <p>Usage:
+ * <pre>
+ * DorisLoadClient client = new DorisLoadClient(config);
+ * LoadResponse resp = client.load(new ByteArrayInputStream(data));
+ * if (resp.getStatus() == LoadResponse.Status.SUCCESS) { ... }
+ * </pre>
+ *
+ * <p>Thread safety: this instance can be shared across threads.
+ * Each {@link #load} call must receive an independent InputStream.
+ */
+public class DorisLoadClient implements AutoCloseable {
+
+    private static final Logger log = 
LoggerFactory.getLogger(DorisLoadClient.class);
+    /** Absolute maximum for a single backoff interval: 5 minutes. */
+    private static final long ABSOLUTE_MAX_INTERVAL_MS = 300_000L;
+
+    private final DorisConfig config;
+    private final StreamLoader streamLoader;
+
+    public DorisLoadClient(DorisConfig config) {
+        this.config = config;
+        this.streamLoader = new StreamLoader();
+    }
+
+    /** Package-private constructor for testing with a mock StreamLoader. */
+    DorisLoadClient(DorisConfig config, StreamLoader streamLoader) {
+        this.config = config;
+        this.streamLoader = streamLoader;
+    }
+
+    /**
+     * Loads data from the given InputStream into Doris via stream load.
+     * The InputStream is fully consumed and buffered before the first attempt.
+     * Retries with exponential backoff on retryable errors (network/HTTP 
failures).
+     * Business failures (bad data, schema mismatch, auth) are returned 
immediately without retry.
+     *
+     * @param inputStream data to load (consumed once; must not be shared 
across threads)
+     * @return LoadResponse with status SUCCESS or FAILURE
+     * @throws IOException if the stream cannot be read or all retries are 
exhausted
+     */
+    public LoadResponse load(InputStream inputStream) throws IOException {
+        int maxRetries = 6;
+        long baseIntervalMs = 1000L;
+        long maxTotalTimeMs = 60000L;
+
+        if (config.getRetry() != null) {
+            maxRetries = config.getRetry().getMaxRetryTimes();
+            baseIntervalMs = config.getRetry().getBaseIntervalMs();
+            maxTotalTimeMs = config.getRetry().getMaxTotalTimeMs();
+        }
+
+        log.info("Starting stream load: {}.{}", config.getDatabase(), 
config.getTable());
+
+        // Buffer the InputStream once so retries can replay the body
+        byte[] bodyData = readAll(inputStream);
+
+        // Compress once before the retry loop (avoids re-compressing on each 
retry)
+        if (config.isEnableGzip()) {
+            bodyData = gzipCompress(bodyData);
+        }
+        Exception lastException = null;
+        LoadResponse lastResponse = null;
+        long operationStart = System.currentTimeMillis();
+
+        for (int attempt = 0; attempt <= maxRetries; attempt++) {
+            if (attempt > 0) {
+                log.info("Retry attempt {}/{}", attempt, maxRetries);
+                // Use actual wall-clock elapsed time (includes request time, 
not just sleep time)
+                long elapsed = System.currentTimeMillis() - operationStart;
+                long backoff = calculateBackoffMs(attempt, baseIntervalMs, 
maxTotalTimeMs, elapsed);
+
+                if (maxTotalTimeMs > 0 && elapsed + backoff > maxTotalTimeMs) {
+                    log.warn("Next retry backoff ({}ms) would exceed total 
limit ({}ms). Stopping.", backoff, maxTotalTimeMs);
+                    break;
+                }
+
+                log.info("Waiting {}ms before retry (elapsed so far: {}ms)", 
backoff, elapsed);
+                sleep(backoff);
+            } else {
+                log.info("Initial load attempt");
+            }
+
+            try {
+                HttpPut request = RequestBuilder.build(config, bodyData, 
attempt);
+                lastResponse = streamLoader.execute(request);
+
+                if (lastResponse.getStatus() == LoadResponse.Status.SUCCESS) {
+                    log.info("Stream load succeeded on attempt {}", attempt + 
1);
+                    return lastResponse;
+                }
+
+                // Business failure (bad data, schema mismatch, auth) — do not 
retry
+                log.error("Load failed (non-retryable): {}", 
lastResponse.getErrorMessage());
+                return lastResponse;
+
+            } catch (StreamLoadException e) {
+                // Retryable: network error, HTTP 5xx, etc.
+                lastException = e;
+                log.error("Attempt {} failed with retryable error: ", attempt 
+ 1, e);
+
+                // Check elapsed wall-clock time (same guard as the Exception 
branch below)
+                long elapsed = System.currentTimeMillis() - operationStart;
+                if (maxTotalTimeMs > 0 && elapsed > maxTotalTimeMs) {
+                    log.warn("Total elapsed time ({}ms) exceeded limit ({}ms), 
stopping retries.", elapsed, maxTotalTimeMs);
+                    break;
+                }
+            } catch (Exception e) {
+                // Wrap unexpected exceptions as retryable
+                lastException = new StreamLoadException("Unexpected error 
building request: " + e.getMessage(), e);
+                log.error("Attempt {} failed with unexpected error: ", attempt 
+ 1, e);
+
+                // Check elapsed wall-clock time
+                long elapsed = System.currentTimeMillis() - operationStart;
+                if (maxTotalTimeMs > 0 && elapsed > maxTotalTimeMs) {
+                    log.warn("Total elapsed time ({}ms) exceeded limit ({}ms), 
stopping retries.", elapsed, maxTotalTimeMs);
+                    break;
+                }
+            }
+        }
+
+        log.debug("Total operation time: {}ms", System.currentTimeMillis() - 
operationStart);
+
+        if (lastException != null) {
+            throw new IOException("Stream load failed after " + (maxRetries + 
1) + " attempts", lastException);
+        }
+
+        if (lastResponse != null) {
+            return lastResponse;
+        }
+
+        throw new IOException("Stream load failed: unknown error");
+    }
+
+    /**
+     * Calculates exponential backoff interval in milliseconds.
+     * Package-private for unit testing.
+     *
+     * Formula: base * 2^(attempt-1), constrained by remaining total time and 
absolute max.
+     *
+     * @param elapsedMs actual wall-clock time elapsed since the operation 
started (includes request time)
+     */
+    static long calculateBackoffMs(int attempt, long baseIntervalMs, long 
maxTotalTimeMs, long elapsedMs) {
+        if (attempt <= 0) return 0;
+        long intervalMs = baseIntervalMs * (1L << (attempt - 1)); // base * 
2^(attempt-1)
+
+        if (maxTotalTimeMs > 0) {
+            long remaining = maxTotalTimeMs - elapsedMs - 5000; // reserve 5s 
for the next request
+            if (remaining <= 0) {
+                intervalMs = 0;
+            } else if (intervalMs > remaining) {
+                intervalMs = remaining;
+            }
+        }
+
+        if (intervalMs > ABSOLUTE_MAX_INTERVAL_MS) intervalMs = 
ABSOLUTE_MAX_INTERVAL_MS;
+        if (intervalMs < 0) intervalMs = 0;
+        return intervalMs;

Review Comment:
   `calculateBackoffMs` uses bit shifting (`1L << (attempt - 1)`) which 
overflows for large `attempt` values (>= 64), resulting in negative/zero 
intervals and breaking backoff behavior. Consider capping the exponent (e.g., 
`Math.min(attempt - 1, 30)`), or using a safe multiply 
loop/`Math.multiplyExact` with overflow handling.



##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/StreamLoader.java:
##########
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.exception.StreamLoadException;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.apache.doris.sdk.load.model.RespContent;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Uses RedirectStrategy so that 307 redirects on PUT requests are
+ * followed automatically (Doris FE redirects stream load to BE).
+ */
+public class StreamLoader implements AutoCloseable {
+
+    private static final Logger log = 
LoggerFactory.getLogger(StreamLoader.class);
+    private static final int SOCKET_TIMEOUT_MS = 9 * 60 * 1000;
+    private static final int CONNECT_TIMEOUT_MS = 60_000;
+
+    private final HttpClientBuilder httpClientBuilder;
+    private final ObjectMapper objectMapper;
+
+    public StreamLoader() {
+        this.httpClientBuilder = buildHttpClient();
+        this.objectMapper = new ObjectMapper();
+    }
+
+    /** Package-private constructor for testing with a mock HTTP client. */
+    StreamLoader(HttpClientBuilder httpClientBuilder) {
+        this.httpClientBuilder = httpClientBuilder;
+        this.objectMapper = new ObjectMapper();
+    }
+
+    /**
+     * Executes the HTTP PUT request and returns a LoadResponse.
+     *
+     * @throws StreamLoadException for retryable HTTP-level errors (non-200 
status, connection failure)
+     * @throws IOException         for unrecoverable I/O errors
+     */
+    public LoadResponse execute(HttpPut request) throws IOException {
+        log.debug("Sending HTTP PUT to {}", request.getURI());
+        long start = System.currentTimeMillis();
+
+        try (CloseableHttpClient httpClient = httpClientBuilder.build();
+                CloseableHttpResponse response = httpClient.execute(request)) {
+            int statusCode = response.getStatusLine().getStatusCode();
+            log.debug("HTTP response status: {}", statusCode);
+            log.debug("HTTP request completed in {} ms", 
System.currentTimeMillis() - start);
+
+            if (statusCode == 200) {
+                return parseResponse(response);
+            } else {
+                // Non-200 is retryable (e.g. 503, 429)
+                throw new StreamLoadException("stream load error: " + 
response.getStatusLine().toString());
+            }
+        } catch (StreamLoadException e) {
+            throw e;
+        } catch (IOException e) {
+            throw new StreamLoadException("stream load request failed: " + 
e.getMessage(), e);
+        }
+    }
+
+    private LoadResponse parseResponse(CloseableHttpResponse response) throws 
IOException {
+        byte[] bodyBytes = EntityUtils.toByteArray(response.getEntity());
+        String body = new String(bodyBytes, StandardCharsets.UTF_8);
+        log.info("Stream Load Response: {}", body);
+
+        RespContent resp = objectMapper.readValue(body, RespContent.class);
+
+        if (isSuccess(resp.getStatus())) {
+            log.info("Load operation completed successfully");
+            return LoadResponse.success(resp);
+        } else {
+            log.error("Load operation failed with status: {}", 
resp.getStatus());
+            String errorMsg;
+            if (resp.getMessage() != null && !resp.getMessage().isEmpty()) {
+                errorMsg = "load failed. cause by: " + resp.getMessage()
+                        + ", please check more detail from url: " + 
resp.getErrorUrl();
+            } else {
+                errorMsg = body;
+            }
+            return LoadResponse.failure(resp, errorMsg);
+        }
+    }
+
+    private static boolean isSuccess(String status) {
+        return "success".equalsIgnoreCase(status);
+    }
+
+    private static HttpClientBuilder buildHttpClient() {
+        try {
+            RequestConfig requestConfig = RequestConfig.custom()
+                    .setConnectTimeout(CONNECT_TIMEOUT_MS)
+                    .setSocketTimeout(SOCKET_TIMEOUT_MS)
+                    .setConnectionRequestTimeout(CONNECT_TIMEOUT_MS)
+                    .build();
+
+            return HttpClientBuilder.create()
+                    .setDefaultRequestConfig(requestConfig)
+                    .setRedirectStrategy(new DefaultRedirectStrategy() {
+                        @Override
+                        protected boolean isRedirectable(String method) {
+                            return true;
+                        }
+                    })
+                    .setSSLSocketFactory(
+                            new SSLConnectionSocketFactory(
+                                    SSLContextBuilder.create()
+                                            .loadTrustMaterial(null, (chain, 
authType) -> true)
+                                            .build(),
+                                    NoopHostnameVerifier.INSTANCE));

Review Comment:
   `buildHttpClient()` configures an SSL context that trusts all certificates 
and disables hostname verification. This effectively removes TLS security (MITM 
is possible) for any https endpoint. Prefer using the JVM default trust 
store/hostname verifier, or make an explicit `insecureSkipVerify`-style option 
that is off by default and clearly documented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to