This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 73890b176ba [Enhancement](sdk) Add java SDK as a subproject under
sdk/java-doris-sdk (#61626)
73890b176ba is described below
commit 73890b176bac65b666ca9456d186baae0e71b6c5
Author: wudi <[email protected]>
AuthorDate: Tue Mar 24 14:33:12 2026 +0800
[Enhancement](sdk) Add java SDK as a subproject under sdk/java-doris-sdk
(#61626)
### What problem does this PR solve?
The previous pull request (https://github.com/apache/doris/pull/58404)
supported the Go SDK; this time, a Java version will be added based on
it.
---
sdk/java-doris-sdk/README.md | 310 +++++++++++++++++++++
sdk/java-doris-sdk/pom.xml | 85 ++++++
.../java/org/apache/doris/sdk/DorisClient.java | 82 ++++++
.../doris/sdk/examples/ConcurrentExample.java | 190 +++++++++++++
.../apache/doris/sdk/examples/DataGenerator.java | 148 ++++++++++
.../apache/doris/sdk/examples/ExamplesMain.java | 75 +++++
.../org/apache/doris/sdk/examples/GzipExample.java | 79 ++++++
.../org/apache/doris/sdk/examples/JsonExample.java | 97 +++++++
.../doris/sdk/examples/SimpleConfigExample.java | 81 ++++++
.../doris/sdk/examples/SingleBatchExample.java | 91 ++++++
.../org/apache/doris/sdk/load/DorisLoadClient.java | 227 +++++++++++++++
.../apache/doris/sdk/load/config/CsvFormat.java | 52 ++++
.../apache/doris/sdk/load/config/DorisConfig.java | 146 ++++++++++
.../org/apache/doris/sdk/load/config/Format.java | 32 +++
.../doris/sdk/load/config/GroupCommitMode.java | 31 +++
.../apache/doris/sdk/load/config/JsonFormat.java | 63 +++++
.../apache/doris/sdk/load/config/RetryConfig.java | 66 +++++
.../sdk/load/exception/StreamLoadException.java | 33 +++
.../doris/sdk/load/internal/RequestBuilder.java | 166 +++++++++++
.../doris/sdk/load/internal/StreamLoader.java | 149 ++++++++++
.../apache/doris/sdk/load/model/LoadResponse.java | 56 ++++
.../apache/doris/sdk/load/model/RespContent.java | 92 ++++++
.../src/main/resources/log4j.properties | 23 ++
.../apache/doris/sdk/load/DorisLoadClientTest.java | 129 +++++++++
.../doris/sdk/load/config/DorisConfigTest.java | 112 ++++++++
.../apache/doris/sdk/load/config/FormatTest.java | 64 +++++
.../sdk/load/internal/RequestBuilderTest.java | 113 ++++++++
.../doris/sdk/load/model/LoadResponseTest.java | 78 ++++++
28 files changed, 2870 insertions(+)
diff --git a/sdk/java-doris-sdk/README.md b/sdk/java-doris-sdk/README.md
new file mode 100644
index 00000000000..a018766ce13
--- /dev/null
+++ b/sdk/java-doris-sdk/README.md
@@ -0,0 +1,310 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# 🚀 Doris Java SDK
+
+[](https://www.oracle.com/java/)
+[](#-thread-safety)
+
+A lightweight Java stream load client for Apache Doris — easy to use, high
performance, and production-ready. Maintained by the Apache Doris core
contributor team.
+
+## ✨ Features
+
+**Easy to Use**: Clean Builder API that encapsulates HTTP configuration,
multi-format support, and intelligent retry logic.
+
+**High Performance**: Built-in efficient concurrency and batch loading best
practices — buffer once, retry multiple times, compress once for Gzip.
+
+**Production Ready**: Battle-tested in large-scale, high-pressure production
environments with full observability.
+
+## 📦 Installation
+
+> **Note**: This SDK has not yet been published to Maven Central. You need to
build and install it locally first.
+
+### Step 1: Build and install locally
+
+```bash
+git clone https://github.com/apache/doris.git
+cd doris/sdk/java-doris-sdk
+mvn install -DskipTests
+```
+
+### Step 2: Add the dependency to your `pom.xml`
+
+```xml
+<dependency>
+ <groupId>org.apache.doris</groupId>
+ <artifactId>java-doris-sdk</artifactId>
+ <version>1.0.0</version>
+</dependency>
+```
+
+## 🚀 Quick Start
+
+### CSV Load
+
+```java
+DorisConfig config = DorisConfig.builder()
+ .endpoints(Arrays.asList("http://127.0.0.1:8030"))
+ .user("root")
+ .password("password")
+ .database("test_db")
+ .table("users")
+ .format(DorisConfig.defaultCsvFormat())
+ .retry(DorisConfig.defaultRetry())
+ .groupCommit(GroupCommitMode.ASYNC)
+ .build();
+
+try (DorisLoadClient client = DorisClient.newClient(config)) {
+ String data = "1,Alice,25\n2,Bob,30\n3,Charlie,35";
+ LoadResponse response = client.load(DorisClient.stringStream(data));
+
+ if (response.getStatus() == LoadResponse.Status.SUCCESS) {
+ System.out.println("Loaded rows: " +
response.getRespContent().getNumberLoadedRows());
+ }
+}
+```
+
+### JSON Load
+
+```java
+DorisConfig config = DorisConfig.builder()
+ .endpoints(Arrays.asList("http://127.0.0.1:8030"))
+ .user("root")
+ .password("password")
+ .database("test_db")
+ .table("users")
+ .format(DorisConfig.defaultJsonFormat()) // JSON Lines format
+ .retry(DorisConfig.defaultRetry())
+ .groupCommit(GroupCommitMode.ASYNC)
+ .build();
+
+try (DorisLoadClient client = DorisClient.newClient(config)) {
+ String jsonData = "{\"id\":1,\"name\":\"Alice\",\"age\":25}\n"
+ + "{\"id\":2,\"name\":\"Bob\",\"age\":30}\n"
+ + "{\"id\":3,\"name\":\"Charlie\",\"age\":35}\n";
+
+ LoadResponse response = client.load(DorisClient.stringStream(jsonData));
+}
+```
+
+## 🛠️ Configuration
+
+### Basic Configuration
+
+```java
+DorisConfig config = DorisConfig.builder()
+ // Required fields
+ .endpoints(Arrays.asList(
+ "http://fe1:8030",
+ "http://fe2:8030" // Multiple FE nodes supported, randomly
load-balanced
+ ))
+ .user("your_username")
+ .password("your_password")
+ .database("your_database")
+ .table("your_table")
+
+ // Optional fields
+ .labelPrefix("my_app") // label prefix
+ .label("custom_label_001") // custom label
+ .format(DorisConfig.defaultCsvFormat())
+ .retry(DorisConfig.defaultRetry())
+ .groupCommit(GroupCommitMode.ASYNC)
+ .options(new HashMap<String, String>() {{
+ put("timeout", "3600");
+ put("max_filter_ratio", "0.1");
+ put("strict_mode", "true");
+ }})
+ .build();
+```
+
+### Data Format
+
+```java
+// 1. Use default formats (recommended)
+DorisConfig.defaultJsonFormat() // JSON Lines, read_json_by_line=true
+DorisConfig.defaultCsvFormat() // CSV, comma-separated, \n line delimiter
+
+// 2. Custom JSON format
+new JsonFormat(JsonFormat.Type.OBJECT_LINE) // JSON Lines
+new JsonFormat(JsonFormat.Type.ARRAY) // JSON Array
+
+// 3. Custom CSV format
+new CsvFormat("|", "\\n") // pipe-separated
+```
+
+### Retry Configuration
+
+```java
+// 1. Default retry (recommended)
+DorisConfig.defaultRetry() // 6 retries, total time limit 60s
+// Backoff sequence: 1s, 2s, 4s, 8s, 16s, 32s
+
+// 2. Custom retry
+RetryConfig.builder()
+ .maxRetryTimes(3) // max 3 retries
+ .baseIntervalMs(2000) // base interval 2 seconds
+ .maxTotalTimeMs(30000) // total time limit 30 seconds
+ .build()
+
+// 3. Disable retry
+.retry(null)
+```
+
+### Group Commit Mode
+
+```java
+GroupCommitMode.ASYNC // async mode, highest throughput
+GroupCommitMode.SYNC // sync mode, immediately visible after return
+GroupCommitMode.OFF // disabled, use traditional stream load
+```
+
+> ⚠️ **Note**: When Group Commit is enabled, all label configuration is
automatically ignored and a warning is logged.
+
+### Gzip Compression
+
+```java
+DorisConfig config = DorisConfig.builder()
+ // ... other config
+ .enableGzip(true) // SDK compresses the request body automatically
+ .build();
+```
+
+> The SDK transparently compresses the request body before sending and sets
the `compress_type=gz` header automatically. Compression runs only once and is
reused across retries.
+
+## 🔄 Concurrent Usage
+
+### Basic Concurrent Example
+
+```java
+DorisLoadClient client = DorisClient.newClient(config); // thread-safe, share
across threads
+
+ExecutorService executor = Executors.newFixedThreadPool(10);
+for (int i = 0; i < 10; i++) {
+ final int workerId = i;
+ executor.submit(() -> {
+ // Each thread uses its own independent data
+ String data = generateWorkerData(workerId);
+ LoadResponse response = client.load(DorisClient.stringStream(data));
+
+ if (response.getStatus() == LoadResponse.Status.SUCCESS) {
+ System.out.printf("Worker %d loaded %d rows%n",
+ workerId, response.getRespContent().getNumberLoadedRows());
+ }
+ });
+}
+```
+
+### ⚠️ Thread Safety
+
+```java
+// ✅ Correct: DorisLoadClient is thread-safe, share one instance across threads
+DorisLoadClient client = DorisClient.newClient(config);
+for (int i = 0; i < 10; i++) {
+ executor.submit(() -> {
+ String data = generateData(); // each thread has its own data
+ client.load(DorisClient.stringStream(data));
+ });
+}
+
+// ❌ Wrong: never share the same InputStream across threads
+InputStream sharedStream = new FileInputStream("data.csv");
+for (int i = 0; i < 10; i++) {
+ executor.submit(() -> client.load(sharedStream)); // concurrent reads on
shared stream cause data corruption
+}
+```
+
+## 📊 Response Handling
+
+```java
+LoadResponse response = client.load(data);
+
+if (response.getStatus() == LoadResponse.Status.SUCCESS) {
+ RespContent resp = response.getRespContent();
+ System.out.println("Load succeeded!");
+ System.out.println("Loaded rows: " + resp.getNumberLoadedRows());
+ System.out.println("Load bytes: " + resp.getLoadBytes());
+ System.out.println("Load time (ms): " + resp.getLoadTimeMs());
+ System.out.println("Label: " + resp.getLabel());
+} else {
+ System.out.println("Load failed: " + response.getErrorMessage());
+
+ // Get detailed error info
+ if (response.getRespContent().getErrorUrl() != null) {
+ System.out.println("Error detail: " +
response.getRespContent().getErrorUrl());
+ }
+}
+```
+
+## 🛠️ Utilities
+
+### Stream Conversion Helpers
+
+```java
+// String to InputStream
+InputStream stream = DorisClient.stringStream("1,Alice,25\n2,Bob,30");
+
+// byte array to InputStream
+byte[] data = ...;
+InputStream stream = DorisClient.bytesStream(data);
+
+// Serialize objects to JSON InputStream (uses Jackson)
+List<User> users = Arrays.asList(new User(1, "Alice"), new User(2, "Bob"));
+InputStream stream = DorisClient.jsonStream(users);
+```
+
+### Default Config Builders
+
+```java
+DorisConfig.defaultRetry() // 6 retries, 60s total time
+DorisConfig.defaultJsonFormat() // JSON Lines format
+DorisConfig.defaultCsvFormat() // standard CSV format
+
+RetryConfig.builder()
+ .maxRetryTimes(3)
+ .baseIntervalMs(1000)
+ .maxTotalTimeMs(30000)
+ .build()
+```
+
+## 📈 Production Examples
+
+Full production-grade examples are available under
`src/main/java/org/apache/doris/sdk/examples/`:
+
+```bash
+# Build
+mvn package -DskipTests
+
+# Run all examples
+java -cp target/java-doris-sdk-1.0.0.jar
org.apache.doris.sdk.examples.ExamplesMain all
+
+# Run individual examples
+java -cp target/java-doris-sdk-1.0.0.jar
org.apache.doris.sdk.examples.ExamplesMain simple # basic JSON load
+java -cp target/java-doris-sdk-1.0.0.jar
org.apache.doris.sdk.examples.ExamplesMain single # large batch load (100k
records)
+java -cp target/java-doris-sdk-1.0.0.jar
org.apache.doris.sdk.examples.ExamplesMain json # production JSON load
(50k records)
+java -cp target/java-doris-sdk-1.0.0.jar
org.apache.doris.sdk.examples.ExamplesMain concurrent # concurrent load (1M
records, 10 threads)
+java -cp target/java-doris-sdk-1.0.0.jar
org.apache.doris.sdk.examples.ExamplesMain gzip # gzip compressed load
+```
+
+## 🤝 Contributing
+
+Pull requests are welcome!
+
+## 🙏 Acknowledgements
+
+Maintained by the Apache Doris core contributor team.
diff --git a/sdk/java-doris-sdk/pom.xml b/sdk/java-doris-sdk/pom.xml
new file mode 100644
index 00000000000..08b10c96925
--- /dev/null
+++ b/sdk/java-doris-sdk/pom.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.doris</groupId>
+ <artifactId>java-doris-sdk</artifactId>
+ <version>1.0.0</version>
+ <packaging>jar</packaging>
+
+ <name>Apache Doris Java SDK</name>
+ <description>A lightweight Apache Doris Stream Load client (Java
version)</description>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <!-- HTTP client -->
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.13</version>
+ </dependency>
+
+ <!-- JSON serialization -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.13.5</version>
+ </dependency>
+
+ <!-- Logging facade -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.9</version>
+ </dependency>
+
+ <!-- Test: JUnit 5 + Mockito -->
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <version>5.10.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>4.11.0</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>3.2.5</version>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/DorisClient.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/DorisClient.java
new file mode 100644
index 00000000000..532cc3d29b2
--- /dev/null
+++ b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/DorisClient.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.sdk.load.DorisLoadClient;
+import org.apache.doris.sdk.load.config.DorisConfig;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Top-level entry point for the Apache Doris Java SDK.
+ *
+ * <pre>
+ * // CSV example
+ * DorisConfig config = DorisConfig.builder()
+ * .endpoints(Arrays.asList("http://127.0.0.1:8030"))
+ * .user("root").password("password")
+ * .database("test_db").table("users")
+ * .format(DorisConfig.defaultCsvFormat())
+ * .retry(DorisConfig.defaultRetry())
+ * .groupCommit(GroupCommitMode.ASYNC)
+ * .build();
+ *
+ * DorisLoadClient client = DorisClient.newClient(config);
+ * LoadResponse resp =
client.load(DorisClient.stringStream("1,Alice,25\n2,Bob,30"));
+ * if (resp.getStatus() == LoadResponse.Status.SUCCESS) {
+ * System.out.println("Loaded rows: " +
resp.getRespContent().getNumberLoadedRows());
+ * }
+ * </pre>
+ */
+public class DorisClient {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private DorisClient() {}
+
+ /**
+ * Creates a new thread-safe DorisLoadClient from the given configuration.
+ * The client should be reused across multiple load calls (and threads).
+ */
+ public static DorisLoadClient newClient(DorisConfig config) {
+ return new DorisLoadClient(config);
+ }
+
+ /** Wraps a UTF-8 string as an InputStream. */
+ public static InputStream stringStream(String data) {
+ return new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+ }
+
+ /** Wraps a byte array as an InputStream. */
+ public static InputStream bytesStream(byte[] data) {
+ return new ByteArrayInputStream(data);
+ }
+
+ /**
+ * Serializes an object to JSON and returns it as an InputStream.
+ * Uses Jackson ObjectMapper.
+ */
+ public static InputStream jsonStream(Object data) throws IOException {
+ byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(data);
+ return new ByteArrayInputStream(bytes);
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/ConcurrentExample.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/ConcurrentExample.java
new file mode 100644
index 00000000000..ac12dbceb69
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/ConcurrentExample.java
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.examples;
+
+import org.apache.doris.sdk.DorisClient;
+import org.apache.doris.sdk.load.DorisLoadClient;
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.config.GroupCommitMode;
+import org.apache.doris.sdk.load.config.RetryConfig;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Production-level concurrent large-scale data loading demo.
+ * Loads 1,000,000 records using 10 concurrent threads, each loading 100,000
records.
+ *
+ * <p>Thread safety: DorisLoadClient is shared across all workers.
+ * Each worker uses its own independent InputStream — never share streams
across threads.
+ *
+ * Mirrors Go SDK's RunConcurrentExample.
+ */
+public class ConcurrentExample {
+
+ private static final Logger log =
LoggerFactory.getLogger(ConcurrentExample.class);
+
+ private static final int TOTAL_RECORDS = 1_000_000;
+ private static final int NUM_WORKERS = 10;
+ private static final int RECORDS_PER_WORKER = TOTAL_RECORDS / NUM_WORKERS;
+
+ /** Per-worker result. */
+ static class WorkerResult {
+ final int workerId;
+ final boolean success;
+ final int recordsLoaded;
+ final long dataSizeBytes;
+ final long loadTimeMs;
+ final String error;
+
+ WorkerResult(int workerId, boolean success, int recordsLoaded,
+ long dataSizeBytes, long loadTimeMs, String error) {
+ this.workerId = workerId;
+ this.success = success;
+ this.recordsLoaded = recordsLoaded;
+ this.dataSizeBytes = dataSizeBytes;
+ this.loadTimeMs = loadTimeMs;
+ this.error = error;
+ }
+ }
+
+ public static void run() {
+ System.out.println("=== Production-Level Concurrent Large-Scale
Loading Demo ===");
+ System.out.printf("Scale: %d total records, %d workers, %d records per
worker%n",
+ TOTAL_RECORDS, NUM_WORKERS, RECORDS_PER_WORKER);
+
+ // Single shared client — thread-safe
+ DorisConfig config = DorisConfig.builder()
+ .endpoints(Arrays.asList("http://localhost:8030"))
+ .user("root")
+ .password("")
+ .database("test")
+ .table("orders")
+ .labelPrefix("prod_concurrent")
+ .format(DorisConfig.defaultCsvFormat())
+
.retry(RetryConfig.builder().maxRetryTimes(5).baseIntervalMs(1000).maxTotalTimeMs(60000).build())
+ .groupCommit(GroupCommitMode.ASYNC)
+ .build();
+
+ try (DorisLoadClient client = DorisClient.newClient(config)) {
+ System.out.println("Load client created successfully");
+
+ AtomicLong totalRecordsLoaded = new AtomicLong(0);
+ AtomicLong totalDataBytes = new AtomicLong(0);
+ AtomicLong successWorkers = new AtomicLong(0);
+ AtomicLong failedWorkers = new AtomicLong(0);
+
+ ExecutorService executor =
Executors.newFixedThreadPool(NUM_WORKERS);
+ List<Future<WorkerResult>> futures = new ArrayList<>();
+ CountDownLatch startLatch = new CountDownLatch(1);
+
+ System.out.printf("Launching %d concurrent workers...%n",
NUM_WORKERS);
+ long overallStart = System.currentTimeMillis();
+
+ for (int i = 0; i < NUM_WORKERS; i++) {
+ final int workerId = i;
+ futures.add(executor.submit(() -> {
+ startLatch.await(); // all workers start simultaneously
+ return runWorker(workerId, client);
+ }));
+ // Small stagger to avoid thundering herd
+ Thread.sleep(100);
+ }
+
+ // Release all workers
+ startLatch.countDown();
+
+ // Collect results
+ List<WorkerResult> results = new ArrayList<>();
+ for (Future<WorkerResult> f : futures) {
+ WorkerResult r = f.get();
+ results.add(r);
+ if (r.success) {
+ totalRecordsLoaded.addAndGet(r.recordsLoaded);
+ totalDataBytes.addAndGet(r.dataSizeBytes);
+ successWorkers.incrementAndGet();
+ } else {
+ failedWorkers.incrementAndGet();
+ }
+ }
+
+ executor.shutdown();
+ long overallTimeMs = System.currentTimeMillis() - overallStart;
+ double overallTimeSec = overallTimeMs / 1000.0;
+
+ // Summary
+ System.out.println("\n=== CONCURRENT LOAD COMPLETE ===");
+ System.out.printf("Total records loaded: %d/%d%n",
+ totalRecordsLoaded.get(), TOTAL_RECORDS);
+ System.out.printf("Workers: %d successful, %d failed%n",
+ successWorkers.get(), failedWorkers.get());
+ System.out.printf("Total time: %.2f s%n", overallTimeSec);
+ System.out.printf("Overall rate: %.0f records/sec%n",
+ totalRecordsLoaded.get() / overallTimeSec);
+ System.out.printf("Data processed: %.1f MB%n",
+ totalDataBytes.get() / 1024.0 / 1024.0);
+
+ } catch (Exception e) {
+ System.err.println("Concurrent load failed: " + e.getMessage());
+ }
+
+ System.out.println("=== Demo Complete ===");
+ }
+
+ private static WorkerResult runWorker(int workerId, DorisLoadClient
client) {
+ log.info("Worker-{} starting load of {} records", workerId,
RECORDS_PER_WORKER);
+ long workerStart = System.currentTimeMillis();
+
+ // Each worker generates its own independent data — never share
streams across threads
+ String data = DataGenerator.generateOrderCSV(workerId,
RECORDS_PER_WORKER);
+ long dataSizeBytes = data.length();
+
+ try {
+ long loadStart = System.currentTimeMillis();
+ InputStream inputStream = DorisClient.stringStream(data);
+ LoadResponse response = client.load(inputStream);
+ long loadTimeMs = System.currentTimeMillis() - loadStart;
+
+ if (response.getStatus() == LoadResponse.Status.SUCCESS) {
+ log.info("Worker-{} completed: {} records in {} ms",
+ workerId, RECORDS_PER_WORKER, loadTimeMs);
+ return new WorkerResult(workerId, true, RECORDS_PER_WORKER,
+ dataSizeBytes, loadTimeMs, null);
+ } else {
+ log.error("Worker-{} failed: {}", workerId,
response.getErrorMessage());
+ return new WorkerResult(workerId, false, 0, dataSizeBytes,
loadTimeMs,
+ response.getErrorMessage());
+ }
+ } catch (Exception e) {
+ long loadTimeMs = System.currentTimeMillis() - workerStart;
+ log.error("Worker-{} error: {}", workerId, e.getMessage());
+ return new WorkerResult(workerId, false, 0, dataSizeBytes,
loadTimeMs,
+ e.getMessage());
+ }
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/DataGenerator.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/DataGenerator.java
new file mode 100644
index 00000000000..04b5b6c316e
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/DataGenerator.java
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.examples;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Random;
+
+/**
+ * Unified data generation utilities for all Doris Stream Load examples.
+ * All examples use a unified Order schema for consistency.
+ *
+ * <p>Table DDL (orders):
+ * <pre>
+ * CREATE TABLE orders (
+ * order_id BIGINT,
+ * customer_id BIGINT,
+ * product_name VARCHAR(200),
+ * category VARCHAR(50),
+ * brand VARCHAR(50),
+ * quantity INT,
+ * unit_price DECIMAL(10,2),
+ * total_amount DECIMAL(10,2),
+ * status VARCHAR(20),
+ * order_date DATETIME,
+ * region VARCHAR(20)
+ * ) DISTRIBUTED BY HASH(order_id) BUCKETS 10
+ * PROPERTIES (
+ * "replication_allocation" = "tag.location.default: 1",
+ * )
+ * </pre>
+ */
+public class DataGenerator {
+
+ private static final String[] CATEGORIES = {
+ "Electronics", "Clothing", "Books", "Home", "Sports",
+ "Beauty", "Automotive", "Food", "Health", "Toys"
+ };
+
+ private static final String[] BRANDS = {
+ "Apple", "Samsung", "Nike", "Adidas", "Sony",
+ "LG", "Canon", "Dell", "HP", "Xiaomi", "Huawei", "Lenovo"
+ };
+
+ private static final String[] STATUSES = {
+ "active", "inactive", "pending", "discontinued", "completed",
"cancelled"
+ };
+
+ private static final String[] REGIONS = {"North", "South", "East", "West",
"Central"};
+
+ /**
+ * Generates order data in CSV format.
+ *
+ * @param workerID worker ID (0 for single-threaded), used to offset
order IDs
+ * @param batchSize number of records to generate
+ * @return CSV string (no header row)
+ */
+ public static String generateOrderCSV(int workerID, int batchSize) {
+ long seed = System.nanoTime() + (long) workerID * 1000;
+ Random rng = new Random(seed);
+ int baseOrderID = workerID * batchSize;
+
+ StringBuilder sb = new StringBuilder(batchSize * 200);
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ for (int i = 1; i <= batchSize; i++) {
+ int quantity = rng.nextInt(10) + 1;
+ double unitPrice = (rng.nextInt(50000) + 1) / 100.0;
+ double totalAmount = quantity * unitPrice;
+ String productName = "Product_" +
BRANDS[rng.nextInt(BRANDS.length)] + "_" + rng.nextInt(1000);
+ String category = CATEGORIES[rng.nextInt(CATEGORIES.length)];
+ String brand = BRANDS[rng.nextInt(BRANDS.length)];
+ String status = STATUSES[rng.nextInt(STATUSES.length)];
+ String region = REGIONS[rng.nextInt(REGIONS.length)];
+ long offsetMs = (long) rng.nextInt(365 * 24 * 3600) * 1000L;
+ String orderDate = sdf.format(new Date(System.currentTimeMillis()
- offsetMs));
+
+ sb.append(baseOrderID + i).append(",")
+ .append(rng.nextInt(100000) + 1).append(",")
+ .append("\"").append(productName).append("\"").append(",")
+ .append(category).append(",")
+ .append(brand).append(",")
+ .append(quantity).append(",")
+ .append(String.format("%.2f", unitPrice)).append(",")
+ .append(String.format("%.2f", totalAmount)).append(",")
+ .append(status).append(",")
+ .append(orderDate).append(",")
+ .append(region).append("\n");
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Generates order data in JSON Lines format (one JSON object per line).
+ *
+ * @param workerID worker ID (0 for single-threaded)
+ * @param batchSize number of records to generate
+ * @return JSON Lines string
+ */
+ public static String generateOrderJSON(int workerID, int batchSize) {
+ long seed = System.nanoTime() + (long) workerID * 1000;
+ Random rng = new Random(seed);
+ int baseOrderID = workerID * batchSize;
+
+ StringBuilder sb = new StringBuilder(batchSize * 300);
+ SimpleDateFormat sdf = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+
+ for (int i = 1; i <= batchSize; i++) {
+ int quantity = rng.nextInt(10) + 1;
+ double unitPrice = (rng.nextInt(50000) + 1) / 100.0;
+ double totalAmount = quantity * unitPrice;
+ String productName = "Product_" +
BRANDS[rng.nextInt(BRANDS.length)] + "_" + rng.nextInt(1000);
+ String category = CATEGORIES[rng.nextInt(CATEGORIES.length)];
+ String brand = BRANDS[rng.nextInt(BRANDS.length)];
+ String status = STATUSES[rng.nextInt(STATUSES.length)];
+ String region = REGIONS[rng.nextInt(REGIONS.length)];
+ long offsetMs = (long) rng.nextInt(365 * 24 * 3600) * 1000L;
+ String orderDate = sdf.format(new Date(System.currentTimeMillis()
- offsetMs));
+
+ sb.append(String.format(
+
"{\"order_id\":%d,\"customer_id\":%d,\"product_name\":\"%s\","
+ + "\"category\":\"%s\",\"brand\":\"%s\",\"quantity\":%d,"
+ + "\"unit_price\":%.2f,\"total_amount\":%.2f,"
+ +
"\"status\":\"%s\",\"order_date\":\"%s\",\"region\":\"%s\"}",
+ baseOrderID + i,
+ rng.nextInt(100000) + 1,
+ productName, category, brand, quantity,
+ unitPrice, totalAmount, status, orderDate, region));
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/ExamplesMain.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/ExamplesMain.java
new file mode 100644
index 00000000000..41c46d8ab42
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/ExamplesMain.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.examples;
+
+/**
+ * Entry point for running all examples.
+ *
+ * <p>Usage:
+ * <pre>
+ * # Run all examples
+ * java -cp java-doris-sdk-1.0.0.jar
org.apache.doris.sdk.examples.ExamplesMain all
+ *
+ * # Run individual examples
+ * java -cp java-doris-sdk-1.0.0.jar
org.apache.doris.sdk.examples.ExamplesMain simple
+ * java -cp java-doris-sdk-1.0.0.jar
org.apache.doris.sdk.examples.ExamplesMain single
+ * java -cp java-doris-sdk-1.0.0.jar
org.apache.doris.sdk.examples.ExamplesMain json
+ * java -cp java-doris-sdk-1.0.0.jar
org.apache.doris.sdk.examples.ExamplesMain concurrent
+ * java -cp java-doris-sdk-1.0.0.jar
org.apache.doris.sdk.examples.ExamplesMain gzip
+ * </pre>
+ */
+public class ExamplesMain {
+
+ public static void main(String[] args) {
+ String mode = (args.length > 0) ? args[0].toLowerCase() : "all";
+
+ switch (mode) {
+ case "all":
+ System.out.println("\n>>> Running: simple");
+ SimpleConfigExample.run();
+ System.out.println("\n>>> Running: single");
+ SingleBatchExample.run();
+ System.out.println("\n>>> Running: json");
+ JsonExample.run();
+ System.out.println("\n>>> Running: concurrent");
+ ConcurrentExample.run();
+ System.out.println("\n>>> Running: gzip");
+ GzipExample.run();
+ break;
+ case "simple":
+ SimpleConfigExample.run();
+ break;
+ case "single":
+ SingleBatchExample.run();
+ break;
+ case "json":
+ JsonExample.run();
+ break;
+ case "concurrent":
+ ConcurrentExample.run();
+ break;
+ case "gzip":
+ GzipExample.run();
+ break;
+ default:
+ System.err.println("Unknown example: " + mode);
+ System.err.println("Available: all | simple | single | json |
concurrent | gzip");
+ System.exit(1);
+ }
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/GzipExample.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/GzipExample.java
new file mode 100644
index 00000000000..9e5a30c7619
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/GzipExample.java
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.examples;
+
+import org.apache.doris.sdk.DorisClient;
+import org.apache.doris.sdk.load.DorisLoadClient;
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.config.GroupCommitMode;
+import org.apache.doris.sdk.load.model.LoadResponse;
+
+import java.util.Arrays;
+
+/**
+ * Gzip compression example.
+ * The SDK compresses the request body transparently before sending.
+ * No need to pre-compress the data — just set enableGzip(true).
+ * Mirrors Go SDK's GzipExample.
+ */
+public class GzipExample {
+
+ public static void run() {
+ DorisConfig config = DorisConfig.builder()
+ .endpoints(Arrays.asList("http://localhost:8030"))
+ .user("root")
+ .password("")
+ .database("test")
+ .table("orders")
+ .format(DorisConfig.defaultJsonFormat())
+ .retry(DorisConfig.defaultRetry())
+ .groupCommit(GroupCommitMode.OFF)
+ .enableGzip(true) // SDK compresses the body and sets
compress_type=gz header automatically
+ .build();
+
+ try (DorisLoadClient client = DorisClient.newClient(config)) {
+ String jsonData =
+ "{\"order_id\": 1001, \"customer_id\": 201,
\"product_name\": \"Laptop\","
+ + " \"category\": \"Electronics\", \"brand\": \"Dell\",
\"quantity\": 1,"
+ + " \"unit_price\": 999.99, \"total_amount\": 999.99,
\"status\": \"active\","
+ + " \"order_date\": \"2026-01-01 12:00:00\", \"region\":
\"North\"}\n"
+ + "{\"order_id\": 1002, \"customer_id\": 202,
\"product_name\": \"Phone\","
+ + " \"category\": \"Electronics\", \"brand\": \"Apple\",
\"quantity\": 1,"
+ + " \"unit_price\": 799.99, \"total_amount\": 799.99,
\"status\": \"active\","
+ + " \"order_date\": \"2026-01-02 10:00:00\", \"region\":
\"South\"}\n"
+ + "{\"order_id\": 1003, \"customer_id\": 203,
\"product_name\": \"Tablet\","
+ + " \"category\": \"Electronics\", \"brand\": \"Samsung\",
\"quantity\": 2,"
+ + " \"unit_price\": 349.99, \"total_amount\": 699.98,
\"status\": \"active\","
+ + " \"order_date\": \"2026-01-03 09:00:00\", \"region\":
\"East\"}\n";
+
+ LoadResponse response =
client.load(DorisClient.stringStream(jsonData));
+
+ System.out.println("Status: " + response.getStatus());
+ if (response.getStatus() == LoadResponse.Status.SUCCESS) {
+ System.out.println("Loaded rows: " +
response.getRespContent().getNumberLoadedRows());
+ System.out.println("Load bytes: " +
response.getRespContent().getLoadBytes());
+ System.out.println("Label: " +
response.getRespContent().getLabel());
+ } else {
+ System.out.println("Message: " +
response.getRespContent().getMessage());
+ System.out.println("Error URL: " +
response.getRespContent().getErrorUrl());
+ }
+ } catch (Exception e) {
+ System.err.println("Load failed: " + e.getMessage());
+ }
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/JsonExample.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/JsonExample.java
new file mode 100644
index 00000000000..82ec00c32a3
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/JsonExample.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.examples;
+
+import org.apache.doris.sdk.DorisClient;
+import org.apache.doris.sdk.load.DorisLoadClient;
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.config.GroupCommitMode;
+import org.apache.doris.sdk.load.config.JsonFormat;
+import org.apache.doris.sdk.load.config.RetryConfig;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+/**
+ * Production-level JSON data loading demo.
+ * Loads 50,000 JSON Lines order records in a single stream load call.
+ * Mirrors Go SDK's RunJSONExample.
+ */
+public class JsonExample {
+
+ private static final Logger log =
LoggerFactory.getLogger(JsonExample.class);
+ private static final int JSON_BATCH_SIZE = 50_000;
+
+ public static void run() {
+ System.out.println("=== Production-Level JSON Data Loading Demo ===");
+ log.info("Starting JSON loading demo with {} order records",
JSON_BATCH_SIZE);
+
+ DorisConfig config = DorisConfig.builder()
+ .endpoints(Arrays.asList("http://localhost:8030"))
+ .user("root")
+ .password("")
+ .database("test")
+ .table("orders")
+ .labelPrefix("prod_json")
+ .format(new JsonFormat(JsonFormat.Type.OBJECT_LINE))
+
.retry(RetryConfig.builder().maxRetryTimes(3).baseIntervalMs(2000).maxTotalTimeMs(60000).build())
+ .groupCommit(GroupCommitMode.ASYNC)
+ .build();
+
+ try (DorisLoadClient client = DorisClient.newClient(config)) {
+ log.info("JSON load client created successfully");
+
+ // Generate realistic JSON Lines order data
+ log.info("Generating {} JSON order records...", JSON_BATCH_SIZE);
+ String jsonData = DataGenerator.generateOrderJSON(0,
JSON_BATCH_SIZE);
+
+ log.info("Starting JSON load operation for {} order records...",
JSON_BATCH_SIZE);
+ long loadStart = System.currentTimeMillis();
+
+ LoadResponse response =
client.load(DorisClient.stringStream(jsonData));
+
+ long loadTimeMs = System.currentTimeMillis() - loadStart;
+ double loadTimeSec = loadTimeMs / 1000.0;
+
+ if (response.getStatus() == LoadResponse.Status.SUCCESS) {
+ double sizeMB = jsonData.length() / 1024.0 / 1024.0;
+ System.out.println("JSON load completed successfully!");
+ System.out.printf("JSON Records: %d, Size: %.1f MB, Time: %.2f
s%n",
+ JSON_BATCH_SIZE, sizeMB, loadTimeSec);
+ System.out.printf("JSON Rate: %.0f records/sec, %.1f MB/sec%n",
+ JSON_BATCH_SIZE / loadTimeSec, sizeMB / loadTimeSec);
+ System.out.printf("Label: %s, Loaded: %d rows%n",
+ response.getRespContent().getLabel(),
+ response.getRespContent().getNumberLoadedRows());
+ if (response.getRespContent().getLoadBytes() > 0) {
+ double avgBytes = (double)
response.getRespContent().getLoadBytes()
+ / response.getRespContent().getNumberLoadedRows();
+ System.out.printf("Average bytes per JSON record: %.1f%n",
avgBytes);
+ }
+ } else {
+ System.err.println("JSON load failed: " +
response.getErrorMessage());
+ }
+ } catch (Exception e) {
+ System.err.println("JSON load failed: " + e.getMessage());
+ }
+
+ System.out.println("=== JSON Demo Complete ===");
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/SimpleConfigExample.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/SimpleConfigExample.java
new file mode 100644
index 00000000000..c9cc0bea890
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/SimpleConfigExample.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.examples;
+
+import org.apache.doris.sdk.DorisClient;
+import org.apache.doris.sdk.load.DorisLoadClient;
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.config.GroupCommitMode;
+import org.apache.doris.sdk.load.model.LoadResponse;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple configuration example — basic JSON Lines load with Group Commit
ASYNC.
+ * Mirrors Go SDK's SimpleConfigExample.
+ */
+public class SimpleConfigExample {
+
+ public static void run() {
+ Map<String, String> options = new HashMap<>();
+ options.put("strict_mode", "true");
+ options.put("max_filter_ratio", "0.1");
+
+ DorisConfig config = DorisConfig.builder()
+ .endpoints(Arrays.asList("http://localhost:8030"))
+ .user("root")
+ .password("")
+ .database("test")
+ .table("orders")
+ .format(DorisConfig.defaultJsonFormat())
+ .retry(DorisConfig.defaultRetry())
+ .groupCommit(GroupCommitMode.ASYNC)
+ .options(options)
+ .build();
+
+ try (DorisLoadClient client = DorisClient.newClient(config)) {
+ String jsonData = "{\"order_id\": 1, \"customer_id\": 101,
\"product_name\": \"Laptop\","
+ + " \"category\": \"Electronics\", \"brand\": \"Dell\",
\"quantity\": 1,"
+ + " \"unit_price\": 999.99, \"total_amount\": 999.99,
\"status\": \"active\","
+ + " \"order_date\": \"2026-01-01 12:00:00\", \"region\":
\"North\"}\n"
+ + "{\"order_id\": 2, \"customer_id\": 102,
\"product_name\": \"Phone\","
+ + " \"category\": \"Electronics\", \"brand\": \"Samsung\",
\"quantity\": 2,"
+ + " \"unit_price\": 499.99, \"total_amount\": 999.98,
\"status\": \"active\","
+ + " \"order_date\": \"2026-01-02 10:00:00\", \"region\":
\"South\"}\n"
+ + "{\"order_id\": 3, \"customer_id\": 103,
\"product_name\": \"Headphones\","
+ + " \"category\": \"Electronics\", \"brand\": \"Sony\",
\"quantity\": 1,"
+ + " \"unit_price\": 199.99, \"total_amount\": 199.99,
\"status\": \"active\","
+ + " \"order_date\": \"2026-01-03 09:00:00\", \"region\":
\"East\"}\n";
+
+ LoadResponse response =
client.load(DorisClient.stringStream(jsonData));
+
+ System.out.println("Load completed!");
+ System.out.println("Status: " + response.getStatus());
+ if (response.getStatus() == LoadResponse.Status.SUCCESS) {
+ System.out.println("Loaded rows: " +
response.getRespContent().getNumberLoadedRows());
+ System.out.println("Load bytes: " +
response.getRespContent().getLoadBytes());
+ } else {
+ System.out.println("Error: " + response.getErrorMessage());
+ }
+ } catch (Exception e) {
+ System.err.println("Load failed: " + e.getMessage());
+ }
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/SingleBatchExample.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/SingleBatchExample.java
new file mode 100644
index 00000000000..7d5f7fff618
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/SingleBatchExample.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.examples;
+
+import org.apache.doris.sdk.DorisClient;
+import org.apache.doris.sdk.load.DorisLoadClient;
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.config.GroupCommitMode;
+import org.apache.doris.sdk.load.config.RetryConfig;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+/**
+ * Production-level single-threaded large batch loading demo.
+ * Loads 100,000 CSV order records in a single stream load call.
+ * Mirrors Go SDK's RunSingleBatchExample.
+ */
+public class SingleBatchExample {
+
+ private static final Logger log =
LoggerFactory.getLogger(SingleBatchExample.class);
+ private static final int BATCH_SIZE = 100_000;
+
+ public static void run() {
+ System.out.println("=== Production-Level Large Batch Loading Demo
===");
+ log.info("Starting large batch loading demo with {} order records",
BATCH_SIZE);
+
+ DorisConfig config = DorisConfig.builder()
+ .endpoints(Arrays.asList("http://localhost:8030"))
+ .user("root")
+ .password("")
+ .database("test")
+ .table("orders")
+ .labelPrefix("prod_batch")
+ .format(DorisConfig.defaultCsvFormat())
+
.retry(RetryConfig.builder().maxRetryTimes(3).baseIntervalMs(2000).maxTotalTimeMs(60000).build())
+ .groupCommit(GroupCommitMode.OFF)
+ .build();
+
+ try (DorisLoadClient client = DorisClient.newClient(config)) {
+ log.info("Load client created successfully");
+
+ // Generate large batch of realistic order data
+ log.info("Generating {} order records...", BATCH_SIZE);
+ String data = DataGenerator.generateOrderCSV(0, BATCH_SIZE);
+ log.info("Data generated: {} MB", data.length() / 1024.0 / 1024.0);
+
+ log.info("Starting load operation for {} order records...",
BATCH_SIZE);
+ long loadStart = System.currentTimeMillis();
+
+ LoadResponse response =
client.load(DorisClient.stringStream(data));
+
+ long loadTimeMs = System.currentTimeMillis() - loadStart;
+ double loadTimeSec = loadTimeMs / 1000.0;
+
+ if (response.getStatus() == LoadResponse.Status.SUCCESS) {
+ double sizeMB = data.length() / 1024.0 / 1024.0;
+ System.out.println("Load completed successfully!");
+ System.out.printf("Records: %d, Size: %.1f MB, Time: %.2f
s%n", BATCH_SIZE, sizeMB, loadTimeSec);
+ System.out.printf("Rate: %.0f records/sec, %.1f MB/sec%n",
+ BATCH_SIZE / loadTimeSec, sizeMB / loadTimeSec);
+ System.out.printf("Label: %s, Loaded: %d rows%n",
+ response.getRespContent().getLabel(),
+ response.getRespContent().getNumberLoadedRows());
+ } else {
+ System.err.println("Load failed: " +
response.getErrorMessage());
+ }
+ } catch (Exception e) {
+ System.err.println("Load failed: " + e.getMessage());
+ }
+
+ System.out.println("=== Demo Complete ===");
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/DorisLoadClient.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/DorisLoadClient.java
new file mode 100644
index 00000000000..fdcfa11cc0a
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/DorisLoadClient.java
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load;
+
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.exception.StreamLoadException;
+import org.apache.doris.sdk.load.internal.RequestBuilder;
+import org.apache.doris.sdk.load.internal.StreamLoader;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Thread-safe Doris stream load client.
+ *
+ * <p>Usage:
+ * <pre>
+ * DorisLoadClient client = new DorisLoadClient(config);
+ * LoadResponse resp = client.load(new ByteArrayInputStream(data));
+ * if (resp.getStatus() == LoadResponse.Status.SUCCESS) { ... }
+ * </pre>
+ *
+ * <p>Thread safety: this instance can be shared across threads.
+ * Each {@link #load} call must receive an independent InputStream.
+ */
+public class DorisLoadClient implements AutoCloseable {
+
+ private static final Logger log =
LoggerFactory.getLogger(DorisLoadClient.class);
+ /** Absolute maximum for a single backoff interval: 5 minutes. */
+ private static final long ABSOLUTE_MAX_INTERVAL_MS = 300_000L;
+
+ private final DorisConfig config;
+ private final StreamLoader streamLoader;
+
+ public DorisLoadClient(DorisConfig config) {
+ this.config = config;
+ this.streamLoader = new StreamLoader();
+ }
+
+ /** Package-private constructor for testing with a mock StreamLoader. */
+ DorisLoadClient(DorisConfig config, StreamLoader streamLoader) {
+ this.config = config;
+ this.streamLoader = streamLoader;
+ }
+
+ /**
+ * Loads data from the given InputStream into Doris via stream load.
+ * The InputStream is fully consumed and buffered before the first attempt.
+ * Retries with exponential backoff on retryable errors (network/HTTP
failures).
+ * Business failures (bad data, schema mismatch, auth) are returned
immediately without retry.
+ *
+ * @param inputStream data to load (consumed once; must not be shared
across threads)
+ * @return LoadResponse with status SUCCESS or FAILURE
+ * @throws IOException if the stream cannot be read or all retries are
exhausted
+ */
+ public LoadResponse load(InputStream inputStream) throws IOException {
+ int maxRetries = 6;
+ long baseIntervalMs = 1000L;
+ long maxTotalTimeMs = 60000L;
+
+ if (config.getRetry() != null) {
+ maxRetries = config.getRetry().getMaxRetryTimes();
+ baseIntervalMs = config.getRetry().getBaseIntervalMs();
+ maxTotalTimeMs = config.getRetry().getMaxTotalTimeMs();
+ }
+
+ log.info("Starting stream load: {}.{}", config.getDatabase(),
config.getTable());
+
+ // Buffer the InputStream once so retries can replay the body
+ byte[] bodyData = readAll(inputStream);
+
+ // Compress once before the retry loop (avoids re-compressing on each
retry)
+ if (config.isEnableGzip()) {
+ bodyData = gzipCompress(bodyData);
+ }
+ Exception lastException = null;
+ LoadResponse lastResponse = null;
+ long operationStart = System.currentTimeMillis();
+
+ for (int attempt = 0; attempt <= maxRetries; attempt++) {
+ if (attempt > 0) {
+ log.info("Retry attempt {}/{}", attempt, maxRetries);
+ // Use actual wall-clock elapsed time (includes request time,
not just sleep time)
+ long elapsed = System.currentTimeMillis() - operationStart;
+ long backoff = calculateBackoffMs(attempt, baseIntervalMs,
maxTotalTimeMs, elapsed);
+
+ if (maxTotalTimeMs > 0 && elapsed + backoff > maxTotalTimeMs) {
+ log.warn("Next retry backoff ({}ms) would exceed total
limit ({}ms). Stopping.", backoff, maxTotalTimeMs);
+ break;
+ }
+
+ log.info("Waiting {}ms before retry (elapsed so far: {}ms)",
backoff, elapsed);
+ sleep(backoff);
+ } else {
+ log.info("Initial load attempt");
+ }
+
+ try {
+ HttpPut request = RequestBuilder.build(config, bodyData,
attempt);
+ lastResponse = streamLoader.execute(request);
+
+ if (lastResponse.getStatus() == LoadResponse.Status.SUCCESS) {
+ log.info("Stream load succeeded on attempt {}", attempt +
1);
+ return lastResponse;
+ }
+
+ // Business failure (bad data, schema mismatch, auth) — do not
retry
+ log.error("Load failed (non-retryable): {}",
lastResponse.getErrorMessage());
+ return lastResponse;
+
+ } catch (StreamLoadException e) {
+ // Retryable: network error, HTTP 5xx, etc.
+ lastException = e;
+ log.error("Attempt {} failed with retryable error: ", attempt
+ 1, e);
+
+ // Check elapsed wall-clock time (same guard as the Exception
branch below)
+ long elapsed = System.currentTimeMillis() - operationStart;
+ if (maxTotalTimeMs > 0 && elapsed > maxTotalTimeMs) {
+ log.warn("Total elapsed time ({}ms) exceeded limit ({}ms),
stopping retries.", elapsed, maxTotalTimeMs);
+ break;
+ }
+ } catch (Exception e) {
+ // Wrap unexpected exceptions as retryable
+ lastException = new StreamLoadException("Unexpected error
building request: " + e.getMessage(), e);
+ log.error("Attempt {} failed with unexpected error: ", attempt
+ 1, e);
+
+ // Check elapsed wall-clock time
+ long elapsed = System.currentTimeMillis() - operationStart;
+ if (maxTotalTimeMs > 0 && elapsed > maxTotalTimeMs) {
+ log.warn("Total elapsed time ({}ms) exceeded limit ({}ms),
stopping retries.", elapsed, maxTotalTimeMs);
+ break;
+ }
+ }
+ }
+
+ log.debug("Total operation time: {}ms", System.currentTimeMillis() -
operationStart);
+
+ if (lastException != null) {
+ throw new IOException("Stream load failed after " + (maxRetries +
1) + " attempts", lastException);
+ }
+
+ if (lastResponse != null) {
+ return lastResponse;
+ }
+
+ throw new IOException("Stream load failed: unknown error");
+ }
+
+ /**
+ * Calculates exponential backoff interval in milliseconds.
+ * Package-private for unit testing.
+ *
+ * Formula: base * 2^(attempt-1), constrained by remaining total time and
absolute max.
+ *
+ * @param elapsedMs actual wall-clock time elapsed since the operation
started (includes request time)
+ */
+ static long calculateBackoffMs(int attempt, long baseIntervalMs, long
maxTotalTimeMs, long elapsedMs) {
+ if (attempt <= 0) return 0;
+ long intervalMs = baseIntervalMs * (1L << (attempt - 1)); // base *
2^(attempt-1)
+
+ if (maxTotalTimeMs > 0) {
+ long remaining = maxTotalTimeMs - elapsedMs - 5000; // reserve 5s
for the next request
+ if (remaining <= 0) {
+ intervalMs = 0;
+ } else if (intervalMs > remaining) {
+ intervalMs = remaining;
+ }
+ }
+
+ if (intervalMs > ABSOLUTE_MAX_INTERVAL_MS) intervalMs =
ABSOLUTE_MAX_INTERVAL_MS;
+ if (intervalMs < 0) intervalMs = 0;
+ return intervalMs;
+ }
+
+ private static byte[] readAll(InputStream inputStream) throws IOException {
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ byte[] chunk = new byte[8192];
+ int read;
+ while ((read = inputStream.read(chunk)) != -1) {
+ buffer.write(chunk, 0, read);
+ }
+ return buffer.toByteArray();
+ }
+
+ private static byte[] gzipCompress(byte[] data) throws IOException {
+ ByteArrayOutputStream compressed = new ByteArrayOutputStream();
+ try (GZIPOutputStream gzip = new GZIPOutputStream(compressed)) {
+ gzip.write(data);
+ }
+ return compressed.toByteArray();
+ }
+
+ private static void sleep(long ms) {
+ if (ms <= 0) return;
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ streamLoader.close();
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/CsvFormat.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/CsvFormat.java
new file mode 100644
index 00000000000..19c386911f9
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/CsvFormat.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.config;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * CSV format configuration for stream load.
+ */
+public class CsvFormat implements Format {
+
+ private final String columnSeparator;
+ private final String lineDelimiter;
+
+ public CsvFormat(String columnSeparator, String lineDelimiter) {
+ this.columnSeparator = columnSeparator;
+ this.lineDelimiter = lineDelimiter;
+ }
+
+ @Override
+ public String getFormatType() {
+ return "csv";
+ }
+
+ @Override
+ public Map<String, String> getHeaders() {
+ Map<String, String> headers = new HashMap<>();
+ headers.put("format", "csv");
+ headers.put("column_separator", columnSeparator);
+ headers.put("line_delimiter", lineDelimiter);
+ return headers;
+ }
+
+ public String getColumnSeparator() { return columnSeparator; }
+ public String getLineDelimiter() { return lineDelimiter; }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/DorisConfig.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/DorisConfig.java
new file mode 100644
index 00000000000..643886c43c4
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/DorisConfig.java
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.config;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Configuration for Doris stream load operations.
+ * Use {@link Builder} to construct instances.
+ *
+ * <pre>
+ * DorisConfig config = DorisConfig.builder()
+ * .endpoints(Arrays.asList("http://fe1:8030", "http://fe2:8030"))
+ * .user("root").password("secret")
+ * .database("mydb").table("mytable")
+ * .format(DorisConfig.defaultJsonFormat())
+ * .retry(DorisConfig.defaultRetry())
+ * .groupCommit(GroupCommitMode.ASYNC)
+ * .build();
+ * </pre>
+ */
+public class DorisConfig {
+
+ private final List<String> endpoints;
+ private final String user;
+ private final String password;
+ private final String database;
+ private final String table;
+ private final String labelPrefix;
+ private final String label;
+ private final Format format;
+ private final RetryConfig retry;
+ private final GroupCommitMode groupCommit;
+ private final boolean enableGzip;
+ private final Map<String, String> options;
+
+ private DorisConfig(Builder builder) {
+ this.endpoints = Collections.unmodifiableList(builder.endpoints);
+ this.user = builder.user;
+ this.password = builder.password;
+ this.database = builder.database;
+ this.table = builder.table;
+ this.labelPrefix = builder.labelPrefix;
+ this.label = builder.label;
+ this.format = builder.format;
+ this.retry = builder.retry;
+ this.groupCommit = builder.groupCommit;
+ this.enableGzip = builder.enableGzip;
+ this.options = builder.options != null
+ ? Collections.unmodifiableMap(new HashMap<>(builder.options))
+ : Collections.<String, String>emptyMap();
+ }
+
+ // --- Convenience factory methods (mirrors Go SDK) ---
+
+ /** Default JSON format: JSON Lines (one object per line). */
+ public static JsonFormat defaultJsonFormat() {
+ return new JsonFormat(JsonFormat.Type.OBJECT_LINE);
+ }
+
+ /** Default CSV format: comma separator, \n delimiter. */
+ public static CsvFormat defaultCsvFormat() {
+ return new CsvFormat(",", "\\n");
+ }
+
+ /** Default retry config: 6 retries, 1s base interval, 60s total limit. */
+ public static RetryConfig defaultRetry() {
+ return RetryConfig.defaultRetry();
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ // --- Getters ---
+
+ public List<String> getEndpoints() { return endpoints; }
+ public String getUser() { return user; }
+ public String getPassword() { return password; }
+ public String getDatabase() { return database; }
+ public String getTable() { return table; }
+ public String getLabelPrefix() { return labelPrefix; }
+ public String getLabel() { return label; }
+ public Format getFormat() { return format; }
+ public RetryConfig getRetry() { return retry; }
+ public GroupCommitMode getGroupCommit() { return groupCommit; }
+ public boolean isEnableGzip() { return enableGzip; }
+ public Map<String, String> getOptions() { return options; }
+
+ // --- Builder ---
+
+ public static class Builder {
+ private List<String> endpoints;
+ private String user;
+ private String password = "";
+ private String database;
+ private String table;
+ private String labelPrefix;
+ private String label;
+ private Format format;
+ private RetryConfig retry = RetryConfig.defaultRetry();
+ private GroupCommitMode groupCommit = GroupCommitMode.OFF;
+ private boolean enableGzip = false;
+ private Map<String, String> options;
+
+ public Builder endpoints(List<String> val) { this.endpoints = val;
return this; }
+ public Builder user(String val) { this.user = val; return this; }
+ public Builder password(String val) { this.password = val; return
this; }
+ public Builder database(String val) { this.database = val; return
this; }
+ public Builder table(String val) { this.table = val; return this; }
+ public Builder labelPrefix(String val) { this.labelPrefix = val;
return this; }
+ public Builder label(String val) { this.label = val; return this; }
+ public Builder format(Format val) { this.format = val; return this; }
+ public Builder retry(RetryConfig val) { this.retry = val; return this;
}
+ public Builder groupCommit(GroupCommitMode val) { this.groupCommit =
val; return this; }
+ public Builder enableGzip(boolean val) { this.enableGzip = val; return
this; }
+ public Builder options(Map<String, String> val) { this.options = val;
return this; }
+
+ public DorisConfig build() {
+ if (user == null || user.isEmpty()) throw new
IllegalArgumentException("user cannot be empty");
+ if (database == null || database.isEmpty()) throw new
IllegalArgumentException("database cannot be empty");
+ if (table == null || table.isEmpty()) throw new
IllegalArgumentException("table cannot be empty");
+ if (endpoints == null || endpoints.isEmpty()) throw new
IllegalArgumentException("endpoints cannot be empty");
+ if (format == null) throw new IllegalArgumentException("format
cannot be null");
+ return new DorisConfig(this);
+ }
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/Format.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/Format.java
new file mode 100644
index 00000000000..5255a994a07
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/Format.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.config;
+
+import java.util.Map;
+
+/**
+ * Stream load data format interface.
+ * Implementations: JsonFormat, CsvFormat.
+ */
+public interface Format {
+ /** Returns the format type string, e.g. "json", "csv". */
+ String getFormatType();
+
+ /** Returns format-specific HTTP headers for the stream load request. */
+ Map<String, String> getHeaders();
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/GroupCommitMode.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/GroupCommitMode.java
new file mode 100644
index 00000000000..a19290101d6
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/GroupCommitMode.java
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.config;
+
+/**
+ * Group Commit mode for stream load.
+ * When SYNC or ASYNC is enabled, all label configurations are automatically
ignored.
+ */
+public enum GroupCommitMode {
+ /** Synchronous group commit: data visible immediately. */
+ SYNC,
+ /** Asynchronous group commit: highest throughput. */
+ ASYNC,
+ /** Disabled: traditional stream load mode. */
+ OFF
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/JsonFormat.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/JsonFormat.java
new file mode 100644
index 00000000000..3d67718b430
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/JsonFormat.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.config;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * JSON format configuration for stream load.
+ * Supports JSON Lines (one object per line) and JSON Array formats.
+ */
+public class JsonFormat implements Format {
+
+ public enum Type {
+ /** JSON Lines: one JSON object per line, e.g. {"a":1}\n{"a":2} */
+ OBJECT_LINE,
+ /** JSON Array: a single JSON array, e.g. [{"a":1},{"a":2}] */
+ ARRAY
+ }
+
+ private final Type type;
+
+ public JsonFormat(Type type) {
+ this.type = type;
+ }
+
+ @Override
+ public String getFormatType() {
+ return "json";
+ }
+
+ @Override
+ public Map<String, String> getHeaders() {
+ Map<String, String> headers = new HashMap<>();
+ headers.put("format", "json");
+ if (type == Type.OBJECT_LINE) {
+ headers.put("strip_outer_array", "false");
+ headers.put("read_json_by_line", "true");
+ } else {
+ headers.put("strip_outer_array", "true");
+ }
+ return headers;
+ }
+
+ public Type getType() {
+ return type;
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/RetryConfig.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/RetryConfig.java
new file mode 100644
index 00000000000..d0cfac45574
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/RetryConfig.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.config;
+
+/**
+ * Exponential backoff retry configuration.
+ * Default: 6 retries, 1s base interval, 60s total time limit.
+ * Backoff sequence: 1s, 2s, 4s, 8s, 16s, 32s (~63s total).
+ */
+public class RetryConfig {
+
+ private final int maxRetryTimes;
+ private final long baseIntervalMs;
+ private final long maxTotalTimeMs;
+
+ private RetryConfig(Builder builder) {
+ this.maxRetryTimes = builder.maxRetryTimes;
+ this.baseIntervalMs = builder.baseIntervalMs;
+ this.maxTotalTimeMs = builder.maxTotalTimeMs;
+ }
+
+ /** Creates default retry config (6 retries, 1s base interval, 60s total
limit). */
+ public static RetryConfig defaultRetry() {
+ return
builder().maxRetryTimes(6).baseIntervalMs(1000).maxTotalTimeMs(60000).build();
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public int getMaxRetryTimes() { return maxRetryTimes; }
+ public long getBaseIntervalMs() { return baseIntervalMs; }
+ public long getMaxTotalTimeMs() { return maxTotalTimeMs; }
+
+ public static class Builder {
+ private int maxRetryTimes = 6;
+ private long baseIntervalMs = 1000;
+ private long maxTotalTimeMs = 60000;
+
+ public Builder maxRetryTimes(int val) { this.maxRetryTimes = val;
return this; }
+ public Builder baseIntervalMs(long val) { this.baseIntervalMs = val;
return this; }
+ public Builder maxTotalTimeMs(long val) { this.maxTotalTimeMs = val;
return this; }
+
+ public RetryConfig build() {
+ if (maxRetryTimes < 0) throw new
IllegalArgumentException("maxRetryTimes cannot be negative");
+ if (baseIntervalMs < 0) throw new
IllegalArgumentException("baseIntervalMs cannot be negative");
+ if (maxTotalTimeMs < 0) throw new
IllegalArgumentException("maxTotalTimeMs cannot be negative");
+ return new RetryConfig(this);
+ }
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/exception/StreamLoadException.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/exception/StreamLoadException.java
new file mode 100644
index 00000000000..d47a2d7db26
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/exception/StreamLoadException.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.exception;
+
+/**
+ * Thrown for retryable HTTP-level stream load failures (e.g. HTTP 5xx,
connection errors).
+ * Non-retryable business failures (bad data, auth error) are returned via
LoadResponse.
+ */
+public class StreamLoadException extends RuntimeException {
+
+ public StreamLoadException(String message) {
+ super(message);
+ }
+
+ public StreamLoadException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/RequestBuilder.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/RequestBuilder.java
new file mode 100644
index 00000000000..0a86b7896fc
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/RequestBuilder.java
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.config.GroupCommitMode;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Builds HttpPut requests for Doris stream load.
+ * Handles header assembly, label generation, and group commit logic.
+ */
+public class RequestBuilder {
+
+ private static final Logger log =
LoggerFactory.getLogger(RequestBuilder.class);
+ private static final String STREAM_LOAD_PATTERN =
"http://%s/api/%s/%s/_stream_load";
+ private static final Random RANDOM = new Random();
+
+ /**
+ * Builds an HttpPut request for the given config and data.
+ *
+ * @param config DorisConfig
+ * @param data pre-buffered (and optionally pre-compressed) request
body bytes
+ * @param attempt 0 = first attempt, >0 = retry number
+ */
+ public static HttpPut build(DorisConfig config, byte[] data, int attempt)
throws Exception {
+ String host = pickEndpoint(config.getEndpoints());
+ String url = String.format(STREAM_LOAD_PATTERN, host,
config.getDatabase(), config.getTable());
+
+ HttpPut request = new HttpPut(url);
+ request.setEntity(new ByteArrayEntity(data));
+
+ // Basic auth
+ String credentials = config.getUser() + ":" + config.getPassword();
+ String encoded =
Base64.getEncoder().encodeToString(credentials.getBytes(StandardCharsets.UTF_8));
+ request.setHeader("Authorization", "Basic " + encoded);
+ request.setHeader("Expect", "100-continue");
+
+ // Build and apply all stream load headers
+ Map<String, String> allHeaders = buildStreamLoadHeaders(config);
+ for (Map.Entry<String, String> entry : allHeaders.entrySet()) {
+ request.setHeader(entry.getKey(), entry.getValue());
+ }
+
+ // Label handling: skip labels when group commit is enabled
+ boolean groupCommitEnabled = allHeaders.containsKey("group_commit");
+ if (groupCommitEnabled) {
+ if (config.getLabel() != null && !config.getLabel().isEmpty()) {
+ log.warn("Custom label '{}' specified but group_commit is
enabled. Removing label.", config.getLabel());
+ }
+ if (config.getLabelPrefix() != null &&
!config.getLabelPrefix().isEmpty()) {
+ log.warn("Label prefix '{}' specified but group_commit is
enabled. Removing label prefix.", config.getLabelPrefix());
+ }
+ // Also remove any label header that may have been passed through
options
+ if (request.containsHeader("label")) {
+ log.warn("Label header found in options but group_commit is
enabled. Removing label.");
+ request.removeHeaders("label");
+ }
+ log.info("Group commit enabled - labels removed from request
headers");
+ } else {
+ String label = generateLabel(config, attempt);
+ request.setHeader("label", label);
+ if (attempt > 0) {
+ log.debug("Generated retry label for attempt {}: {}", attempt,
label);
+ } else {
+ log.debug("Generated label: {}", label);
+ }
+ }
+
+ return request;
+ }
+
+ private static Map<String, String> buildStreamLoadHeaders(DorisConfig
config) {
+ Map<String, String> headers = new HashMap<>();
+
+ // User-defined options first (lowest priority)
+ if (config.getOptions() != null) {
+ headers.putAll(config.getOptions());
+ }
+
+ // Format-specific headers
+ if (config.getFormat() != null) {
+ headers.putAll(config.getFormat().getHeaders());
+ }
+
+ // Group commit
+ switch (config.getGroupCommit()) {
+ case SYNC:
+ headers.put("group_commit", "sync_mode");
+ break;
+ case ASYNC:
+ headers.put("group_commit", "async_mode");
+ break;
+ case OFF:
+ default:
+ break;
+ }
+
+ // Gzip compression header
+ if (config.isEnableGzip()) {
+ if (headers.containsKey("compress_type")) {
+ log.warn("Both enableGzip and options[compress_type] are set;
enableGzip takes precedence.");
+ }
+ headers.put("compress_type", "gz");
+ }
+
+ return headers;
+ }
+
+ private static String generateLabel(DorisConfig config, int attempt) {
+ long now = System.currentTimeMillis();
+ String uuid = UUID.randomUUID().toString();
+
+ if (config.getLabel() != null && !config.getLabel().isEmpty()) {
+ if (attempt == 0) {
+ return config.getLabel();
+ } else {
+ return config.getLabel() + "_retry_" + attempt + "_" + now +
"_" + uuid.substring(0, 8);
+ }
+ }
+
+ String prefix = (config.getLabelPrefix() != null &&
!config.getLabelPrefix().isEmpty())
+ ? config.getLabelPrefix() : "load";
+
+ if (attempt == 0) {
+ return prefix + "_" + config.getDatabase() + "_" +
config.getTable() + "_" + now + "_" + uuid;
+ } else {
+ return prefix + "_" + config.getDatabase() + "_" +
config.getTable()
+ + "_" + now + "_retry_" + attempt + "_" + uuid;
+ }
+ }
+
+ /** Picks a random endpoint and strips the http:// scheme to return
host:port. */
+ static String pickEndpoint(List<String> endpoints) {
+ String endpoint = endpoints.get(RANDOM.nextInt(endpoints.size()));
+ if (endpoint.startsWith("http://")) return endpoint.substring(7);
+ if (endpoint.startsWith("https://")) return endpoint.substring(8);
+ return endpoint;
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/StreamLoader.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/StreamLoader.java
new file mode 100644
index 00000000000..bcfe2e86b85
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/StreamLoader.java
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.exception.StreamLoadException;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.apache.doris.sdk.load.model.RespContent;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Uses RedirectStrategy so that 307 redirects on PUT requests are
+ * followed automatically (Doris FE redirects stream load to BE).
+ */
+public class StreamLoader implements AutoCloseable {
+
+ private static final Logger log =
LoggerFactory.getLogger(StreamLoader.class);
+ private static final int SOCKET_TIMEOUT_MS = 9 * 60 * 1000;
+ private static final int CONNECT_TIMEOUT_MS = 60_000;
+
+ private final HttpClientBuilder httpClientBuilder;
+ private final ObjectMapper objectMapper;
+
+ public StreamLoader() {
+ this.httpClientBuilder = buildHttpClient();
+ this.objectMapper = new ObjectMapper();
+ }
+
+ /** Package-private constructor for testing with a mock HTTP client. */
+ StreamLoader(HttpClientBuilder httpClientBuilder) {
+ this.httpClientBuilder = httpClientBuilder;
+ this.objectMapper = new ObjectMapper();
+ }
+
+ /**
+ * Executes the HTTP PUT request and returns a LoadResponse.
+ *
+ * @throws StreamLoadException for retryable HTTP-level errors (non-200
status, connection failure)
+ * @throws IOException for unrecoverable I/O errors
+ */
+ public LoadResponse execute(HttpPut request) throws IOException {
+ log.debug("Sending HTTP PUT to {}", request.getURI());
+ long start = System.currentTimeMillis();
+
+ try (CloseableHttpClient httpClient = httpClientBuilder.build();
+ CloseableHttpResponse response = httpClient.execute(request)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ log.debug("HTTP response status: {}", statusCode);
+ log.debug("HTTP request completed in {} ms",
System.currentTimeMillis() - start);
+
+ if (statusCode == 200) {
+ return parseResponse(response);
+ } else {
+ // Non-200 is retryable (e.g. 503, 429)
+ throw new StreamLoadException("stream load error: " +
response.getStatusLine().toString());
+ }
+ } catch (StreamLoadException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new StreamLoadException("stream load request failed: " +
e.getMessage(), e);
+ }
+ }
+
+ private LoadResponse parseResponse(CloseableHttpResponse response) throws
IOException {
+ byte[] bodyBytes = EntityUtils.toByteArray(response.getEntity());
+ String body = new String(bodyBytes, StandardCharsets.UTF_8);
+ log.info("Stream Load Response: {}", body);
+
+ RespContent resp = objectMapper.readValue(body, RespContent.class);
+
+ if (isSuccess(resp.getStatus())) {
+ log.info("Load operation completed successfully");
+ return LoadResponse.success(resp);
+ } else {
+ log.error("Load operation failed with status: {}",
resp.getStatus());
+ String errorMsg;
+ if (resp.getMessage() != null && !resp.getMessage().isEmpty()) {
+ errorMsg = "load failed. cause by: " + resp.getMessage()
+ + ", please check more detail from url: " +
resp.getErrorUrl();
+ } else {
+ errorMsg = body;
+ }
+ return LoadResponse.failure(resp, errorMsg);
+ }
+ }
+
+ private static boolean isSuccess(String status) {
+ return "success".equalsIgnoreCase(status);
+ }
+
+ private static HttpClientBuilder buildHttpClient() {
+ try {
+ RequestConfig requestConfig = RequestConfig.custom()
+ .setConnectTimeout(CONNECT_TIMEOUT_MS)
+ .setSocketTimeout(SOCKET_TIMEOUT_MS)
+ .setConnectionRequestTimeout(CONNECT_TIMEOUT_MS)
+ .build();
+
+ return HttpClientBuilder.create()
+ .setDefaultRequestConfig(requestConfig)
+ .setRedirectStrategy(new DefaultRedirectStrategy() {
+ @Override
+ protected boolean isRedirectable(String method) {
+ return true;
+ }
+ })
+ .setSSLSocketFactory(
+ new SSLConnectionSocketFactory(
+ SSLContextBuilder.create()
+ .loadTrustMaterial(null, (chain,
authType) -> true)
+ .build(),
+ NoopHostnameVerifier.INSTANCE));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to build HTTP client", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/model/LoadResponse.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/model/LoadResponse.java
new file mode 100644
index 00000000000..befbaa1f654
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/model/LoadResponse.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.model;
+
+/**
+ * Result of a stream load operation.
+ */
+public class LoadResponse {
+
+ public enum Status { SUCCESS, FAILURE }
+
+ private final Status status;
+ private final RespContent respContent;
+ private final String errorMessage;
+
+ private LoadResponse(Status status, RespContent respContent, String
errorMessage) {
+ this.status = status;
+ this.respContent = respContent;
+ this.errorMessage = errorMessage;
+ }
+
+ public static LoadResponse success(RespContent resp) {
+ return new LoadResponse(Status.SUCCESS, resp, null);
+ }
+
+ public static LoadResponse failure(RespContent resp, String errorMessage) {
+ return new LoadResponse(Status.FAILURE, resp, errorMessage);
+ }
+
+ public Status getStatus() { return status; }
+ public RespContent getRespContent() { return respContent; }
+ public String getErrorMessage() { return errorMessage; }
+
+ @Override
+ public String toString() {
+ return "LoadResponse{status=" + status
+ + (errorMessage != null ? ", error='" + errorMessage + "'" :
"")
+ + (respContent != null ? ", resp=" + respContent : "")
+ + "}";
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/model/RespContent.java
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/model/RespContent.java
new file mode 100644
index 00000000000..12df5ee7118
--- /dev/null
+++
b/sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/model/RespContent.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Doris stream load HTTP response body JSON mapping POJO.
+ * Field names match the Doris API response exactly.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RespContent {
+
+ @JsonProperty("TxnId") private long txnId;
+ @JsonProperty("Label") private String label;
+ @JsonProperty("Status") private String status;
+ @JsonProperty("TwoPhaseCommit") private String twoPhaseCommit;
+ @JsonProperty("ExistingJobStatus") private String existingJobStatus;
+ @JsonProperty("Message") private String message;
+ @JsonProperty("NumberTotalRows") private long numberTotalRows;
+ @JsonProperty("NumberLoadedRows") private long numberLoadedRows;
+ @JsonProperty("NumberFilteredRows") private int numberFilteredRows;
+ @JsonProperty("NumberUnselectedRows") private int numberUnselectedRows;
+ @JsonProperty("LoadBytes") private long loadBytes;
+ @JsonProperty("LoadTimeMs") private int loadTimeMs;
+ @JsonProperty("BeginTxnTimeMs") private int beginTxnTimeMs;
+ @JsonProperty("StreamLoadPutTimeMs") private int streamLoadPutTimeMs;
+ @JsonProperty("ReadDataTimeMs") private int readDataTimeMs;
+ @JsonProperty("WriteDataTimeMs") private int writeDataTimeMs;
+ @JsonProperty("CommitAndPublishTimeMs") private int commitAndPublishTimeMs;
+ @JsonProperty("ErrorURL") private String errorUrl;
+
+ public long getTxnId() { return txnId; }
+ public void setTxnId(long txnId) { this.txnId = txnId; }
+ public String getLabel() { return label; }
+ public void setLabel(String label) { this.label = label; }
+ public String getStatus() { return status; }
+ public void setStatus(String status) { this.status = status; }
+ public String getTwoPhaseCommit() { return twoPhaseCommit; }
+ public void setTwoPhaseCommit(String v) { this.twoPhaseCommit = v; }
+ public String getExistingJobStatus() { return existingJobStatus; }
+ public void setExistingJobStatus(String v) { this.existingJobStatus = v; }
+ public String getMessage() { return message; }
+ public void setMessage(String message) { this.message = message; }
+ public long getNumberTotalRows() { return numberTotalRows; }
+ public void setNumberTotalRows(long v) { this.numberTotalRows = v; }
+ public long getNumberLoadedRows() { return numberLoadedRows; }
+ public void setNumberLoadedRows(long v) { this.numberLoadedRows = v; }
+ public int getNumberFilteredRows() { return numberFilteredRows; }
+ public void setNumberFilteredRows(int v) { this.numberFilteredRows = v; }
+ public int getNumberUnselectedRows() { return numberUnselectedRows; }
+ public void setNumberUnselectedRows(int v) { this.numberUnselectedRows =
v; }
+ public long getLoadBytes() { return loadBytes; }
+ public void setLoadBytes(long v) { this.loadBytes = v; }
+ public int getLoadTimeMs() { return loadTimeMs; }
+ public void setLoadTimeMs(int v) { this.loadTimeMs = v; }
+ public int getBeginTxnTimeMs() { return beginTxnTimeMs; }
+ public void setBeginTxnTimeMs(int v) { this.beginTxnTimeMs = v; }
+ public int getStreamLoadPutTimeMs() { return streamLoadPutTimeMs; }
+ public void setStreamLoadPutTimeMs(int v) { this.streamLoadPutTimeMs = v; }
+ public int getReadDataTimeMs() { return readDataTimeMs; }
+ public void setReadDataTimeMs(int v) { this.readDataTimeMs = v; }
+ public int getWriteDataTimeMs() { return writeDataTimeMs; }
+ public void setWriteDataTimeMs(int v) { this.writeDataTimeMs = v; }
+ public int getCommitAndPublishTimeMs() { return commitAndPublishTimeMs; }
+ public void setCommitAndPublishTimeMs(int v) { this.commitAndPublishTimeMs
= v; }
+ public String getErrorUrl() { return errorUrl; }
+ public void setErrorUrl(String errorUrl) { this.errorUrl = errorUrl; }
+
+ @Override
+ public String toString() {
+ return "RespContent{label='" + label + "', status='" + status
+ + "', loadedRows=" + numberLoadedRows + ", loadBytes=" +
loadBytes
+ + ", loadTimeMs=" + loadTimeMs + "}";
+ }
+}
diff --git a/sdk/java-doris-sdk/src/main/resources/log4j.properties
b/sdk/java-doris-sdk/src/main/resources/log4j.properties
new file mode 100644
index 00000000000..6634d0b4b44
--- /dev/null
+++ b/sdk/java-doris-sdk/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, STDOUT
+
+log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
+log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
+log4j.appender.STDOUT.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS}
[%t] %-5p %c - %m%n
\ No newline at end of file
diff --git
a/sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/DorisLoadClientTest.java
b/sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/DorisLoadClientTest.java
new file mode 100644
index 00000000000..dde17985320
--- /dev/null
+++
b/sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/DorisLoadClientTest.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load;
+
+import org.apache.doris.sdk.load.config.*;
+import org.apache.doris.sdk.load.exception.StreamLoadException;
+import org.apache.doris.sdk.load.internal.StreamLoader;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.apache.doris.sdk.load.model.RespContent;
+import org.junit.jupiter.api.Test;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+public class DorisLoadClientTest {
+
+ private DorisConfig buildConfig() {
+ return DorisConfig.builder()
+ .endpoints(Arrays.asList("http://127.0.0.1:8030"))
+ .user("root").password("secret")
+ .database("testdb").table("users")
+ .format(new JsonFormat(JsonFormat.Type.OBJECT_LINE))
+ .retry(new RetryConfig.Builder()
+
.maxRetryTimes(2).baseIntervalMs(10).maxTotalTimeMs(5000).build())
+ .groupCommit(GroupCommitMode.OFF)
+ .build();
+ }
+
+ private RespContent successResp() {
+ RespContent r = new RespContent();
+ r.setStatus("Success");
+ r.setNumberLoadedRows(3);
+ r.setLoadBytes(100);
+ return r;
+ }
+
+ @Test
+ public void testSuccessOnFirstAttempt() throws Exception {
+ StreamLoader mockLoader = mock(StreamLoader.class);
+
when(mockLoader.execute(any())).thenReturn(LoadResponse.success(successResp()));
+
+ DorisLoadClient client = new DorisLoadClient(buildConfig(),
mockLoader);
+ InputStream data = new ByteArrayInputStream("{\"id\":1}".getBytes());
+ LoadResponse resp = client.load(data);
+
+ assertEquals(LoadResponse.Status.SUCCESS, resp.getStatus());
+ verify(mockLoader, times(1)).execute(any());
+ }
+
+ @Test
+ public void testRetryOnStreamLoadException() throws Exception {
+ StreamLoader mockLoader = mock(StreamLoader.class);
+ when(mockLoader.execute(any()))
+ .thenThrow(new StreamLoadException("connection refused"))
+ .thenReturn(LoadResponse.success(successResp()));
+
+ DorisLoadClient client = new DorisLoadClient(buildConfig(),
mockLoader);
+ InputStream data = new ByteArrayInputStream("test".getBytes());
+ LoadResponse resp = client.load(data);
+
+ assertEquals(LoadResponse.Status.SUCCESS, resp.getStatus());
+ verify(mockLoader, times(2)).execute(any());
+ }
+
+ @Test
+ public void testNoRetryOnBusinessFailure() throws Exception {
+ RespContent failResp = new RespContent();
+ failResp.setStatus("Fail");
+ failResp.setMessage("table not found");
+ StreamLoader mockLoader = mock(StreamLoader.class);
+
when(mockLoader.execute(any())).thenReturn(LoadResponse.failure(failResp,
"table not found"));
+
+ DorisLoadClient client = new DorisLoadClient(buildConfig(),
mockLoader);
+ InputStream data = new ByteArrayInputStream("test".getBytes());
+ LoadResponse resp = client.load(data);
+
+ assertEquals(LoadResponse.Status.FAILURE, resp.getStatus());
+ // Business failure should NOT be retried
+ verify(mockLoader, times(1)).execute(any());
+ }
+
+ @Test
+ public void testExhaustsAllRetries() throws Exception {
+ StreamLoader mockLoader = mock(StreamLoader.class);
+ when(mockLoader.execute(any())).thenThrow(new
StreamLoadException("timeout"));
+
+ DorisLoadClient client = new DorisLoadClient(buildConfig(),
mockLoader);
+ InputStream data = new ByteArrayInputStream("test".getBytes());
+
+ try {
+ client.load(data);
+ fail("Expected IOException");
+ } catch (IOException e) {
+ // 1 initial + 2 retries = 3 total attempts
+ verify(mockLoader, times(3)).execute(any());
+ }
+ }
+
+ @Test
+ public void testBackoffCalculation() {
+ // attempt=1, base=1000ms → 1000ms
+ assertEquals(1000, DorisLoadClient.calculateBackoffMs(1, 1000, 60000,
0));
+ // attempt=2, base=1000ms → 2000ms
+ assertEquals(2000, DorisLoadClient.calculateBackoffMs(2, 1000, 60000,
0));
+ // attempt=3, base=1000ms → 4000ms
+ assertEquals(4000, DorisLoadClient.calculateBackoffMs(3, 1000, 60000,
0));
+ // constrained by remaining total time
+ long constrained = DorisLoadClient.calculateBackoffMs(4, 1000, 60000,
55000);
+ assertTrue(constrained <= 5000, "constrained interval should be <=
remaining time");
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/config/DorisConfigTest.java
b/sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/config/DorisConfigTest.java
new file mode 100644
index 00000000000..2b69c0c0be9
--- /dev/null
+++
b/sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/config/DorisConfigTest.java
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.config;
+
+import org.junit.jupiter.api.Test;
+import java.util.Arrays;
+import static org.junit.jupiter.api.Assertions.*;
+
+public class DorisConfigTest {
+
+ private DorisConfig.Builder validBuilder() {
+ return DorisConfig.builder()
+ .endpoints(Arrays.asList("http://127.0.0.1:8030"))
+ .user("root")
+ .password("password")
+ .database("test_db")
+ .table("users")
+ .format(new JsonFormat(JsonFormat.Type.OBJECT_LINE));
+ }
+
+ @Test
+ public void testValidConfigBuilds() {
+ DorisConfig config = validBuilder().build();
+ assertNotNull(config);
+ assertEquals("root", config.getUser());
+ assertEquals("test_db", config.getDatabase());
+ assertEquals("users", config.getTable());
+ }
+
+ @Test
+ public void testMissingUserThrows() {
+ assertThrows(IllegalArgumentException.class, () ->
+ DorisConfig.builder()
+ .endpoints(Arrays.asList("http://127.0.0.1:8030"))
+ .database("db").table("t")
+ .format(new JsonFormat(JsonFormat.Type.OBJECT_LINE))
+ .build()
+ );
+ }
+
+ @Test
+ public void testMissingDatabaseThrows() {
+ assertThrows(IllegalArgumentException.class, () ->
+ DorisConfig.builder()
+ .endpoints(Arrays.asList("http://127.0.0.1:8030"))
+ .user("root").table("t")
+ .format(new JsonFormat(JsonFormat.Type.OBJECT_LINE))
+ .build()
+ );
+ }
+
+ @Test
+ public void testMissingTableThrows() {
+ assertThrows(IllegalArgumentException.class, () ->
+ DorisConfig.builder()
+ .endpoints(Arrays.asList("http://127.0.0.1:8030"))
+ .user("root").database("db")
+ .format(new JsonFormat(JsonFormat.Type.OBJECT_LINE))
+ .build()
+ );
+ }
+
+ @Test
+ public void testEmptyEndpointsThrows() {
+ assertThrows(IllegalArgumentException.class, () ->
+ DorisConfig.builder()
+ .user("root").database("db").table("t")
+ .format(new JsonFormat(JsonFormat.Type.OBJECT_LINE))
+ .build()
+ );
+ }
+
+ @Test
+ public void testNullFormatThrows() {
+ assertThrows(IllegalArgumentException.class, () ->
+ DorisConfig.builder()
+ .endpoints(Arrays.asList("http://127.0.0.1:8030"))
+ .user("root").database("db").table("t")
+ .build()
+ );
+ }
+
+ @Test
+ public void testDefaultRetryValues() {
+ RetryConfig retry = RetryConfig.defaultRetry();
+ assertEquals(6, retry.getMaxRetryTimes());
+ assertEquals(1000L, retry.getBaseIntervalMs());
+ assertEquals(60000L, retry.getMaxTotalTimeMs());
+ }
+
+ @Test
+ public void testNegativeRetryTimesThrows() {
+ assertThrows(IllegalArgumentException.class, () ->
+
RetryConfig.builder().maxRetryTimes(-1).baseIntervalMs(1000).maxTotalTimeMs(60000).build()
+ );
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/config/FormatTest.java
b/sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/config/FormatTest.java
new file mode 100644
index 00000000000..034db7be69b
--- /dev/null
+++
b/sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/config/FormatTest.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.config;
+
+import org.junit.jupiter.api.Test;
+import java.util.Map;
+import static org.junit.jupiter.api.Assertions.*;
+
+public class FormatTest {
+
+ @Test
+ public void testJsonObjectLineHeaders() {
+ JsonFormat fmt = new JsonFormat(JsonFormat.Type.OBJECT_LINE);
+ Map<String, String> headers = fmt.getHeaders();
+ assertEquals("json", headers.get("format"));
+ assertEquals("true", headers.get("read_json_by_line"));
+ assertEquals("false", headers.get("strip_outer_array"));
+ }
+
+ @Test
+ public void testJsonArrayHeaders() {
+ JsonFormat fmt = new JsonFormat(JsonFormat.Type.ARRAY);
+ Map<String, String> headers = fmt.getHeaders();
+ assertEquals("json", headers.get("format"));
+ assertEquals("true", headers.get("strip_outer_array"));
+ assertNull(headers.get("read_json_by_line"));
+ }
+
+ @Test
+ public void testCsvHeaders() {
+ CsvFormat fmt = new CsvFormat(",", "\\n");
+ Map<String, String> headers = fmt.getHeaders();
+ assertEquals("csv", headers.get("format"));
+ assertEquals(",", headers.get("column_separator"));
+ assertEquals("\\n", headers.get("line_delimiter"));
+ }
+
+ @Test
+ public void testJsonFormatType() {
+ JsonFormat fmt = new JsonFormat(JsonFormat.Type.OBJECT_LINE);
+ assertEquals("json", fmt.getFormatType());
+ }
+
+ @Test
+ public void testCsvFormatType() {
+ CsvFormat fmt = new CsvFormat(",", "\\n");
+ assertEquals("csv", fmt.getFormatType());
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/internal/RequestBuilderTest.java
b/sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/internal/RequestBuilderTest.java
new file mode 100644
index 00000000000..113880ec48c
--- /dev/null
+++
b/sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/internal/RequestBuilderTest.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.config.*;
+import org.apache.http.client.methods.HttpPut;
+import org.junit.jupiter.api.Test;
+import java.util.Arrays;
+import static org.junit.jupiter.api.Assertions.*;
+
+public class RequestBuilderTest {
+
+ private DorisConfig buildConfig(GroupCommitMode gcm) {
+ return DorisConfig.builder()
+ .endpoints(Arrays.asList("http://127.0.0.1:8030"))
+ .user("root").password("secret")
+ .database("testdb").table("users")
+ .format(new JsonFormat(JsonFormat.Type.OBJECT_LINE))
+ .groupCommit(gcm)
+ .build();
+ }
+
+ @Test
+ public void testBasicAuthHeader() throws Exception {
+ DorisConfig config = buildConfig(GroupCommitMode.OFF);
+ HttpPut req = RequestBuilder.build(config, "{\"id\":1}".getBytes(), 0);
+ String auth = req.getFirstHeader("Authorization").getValue();
+ assertTrue(auth.startsWith("Basic "));
+ // Base64("root:secret") = "cm9vdDpzZWNyZXQ="
+ assertTrue(auth.contains("cm9vdDpzZWNyZXQ="));
+ }
+
+ @Test
+ public void testUrlPattern() throws Exception {
+ DorisConfig config = buildConfig(GroupCommitMode.OFF);
+ HttpPut req = RequestBuilder.build(config, "test".getBytes(), 0);
+ String url = req.getURI().toString();
+ assertTrue(url.contains("/api/testdb/users/_stream_load"));
+ }
+
+ @Test
+ public void testJsonFormatHeaders() throws Exception {
+ DorisConfig config = buildConfig(GroupCommitMode.OFF);
+ HttpPut req = RequestBuilder.build(config, "{}".getBytes(), 0);
+ assertEquals("json", req.getFirstHeader("format").getValue());
+ assertEquals("true",
req.getFirstHeader("read_json_by_line").getValue());
+ }
+
+ @Test
+ public void testLabelSetWhenGroupCommitOff() throws Exception {
+ DorisConfig config = buildConfig(GroupCommitMode.OFF);
+ HttpPut req = RequestBuilder.build(config, "test".getBytes(), 0);
+ assertNotNull(req.getFirstHeader("label"));
+ }
+
+ @Test
+ public void testLabelNotSetWhenGroupCommitAsync() throws Exception {
+ DorisConfig config = buildConfig(GroupCommitMode.ASYNC);
+ HttpPut req = RequestBuilder.build(config, "test".getBytes(), 0);
+ assertNull(req.getFirstHeader("label"));
+ assertEquals("async_mode",
req.getFirstHeader("group_commit").getValue());
+ }
+
+ @Test
+ public void testGzipHeader() throws Exception {
+ DorisConfig config = DorisConfig.builder()
+ .endpoints(Arrays.asList("http://127.0.0.1:8030"))
+ .user("root").password("").database("db").table("t")
+ .format(new CsvFormat(",", "\\n"))
+ .enableGzip(true)
+ .build();
+ HttpPut req = RequestBuilder.build(config, "1,a".getBytes(), 0);
+ assertEquals("gz", req.getFirstHeader("compress_type").getValue());
+ }
+
+ @Test
+ public void testRetryLabelHasSuffix() throws Exception {
+ DorisConfig config = buildConfig(GroupCommitMode.OFF);
+ HttpPut req0 = RequestBuilder.build(config, "test".getBytes(), 0);
+ HttpPut req1 = RequestBuilder.build(config, "test".getBytes(), 1);
+ String label0 = req0.getFirstHeader("label").getValue();
+ String label1 = req1.getFirstHeader("label").getValue();
+ assertTrue(label1.contains("retry"), "retry label must contain
'retry'");
+ assertNotEquals(label0, label1);
+ }
+
+ @Test
+ public void testCustomLabelUsedOnFirstAttempt() throws Exception {
+ DorisConfig config = DorisConfig.builder()
+ .endpoints(Arrays.asList("http://127.0.0.1:8030"))
+ .user("root").password("").database("db").table("t")
+ .format(new JsonFormat(JsonFormat.Type.OBJECT_LINE))
+ .label("my_custom_label")
+ .build();
+ HttpPut req = RequestBuilder.build(config, "test".getBytes(), 0);
+ assertEquals("my_custom_label",
req.getFirstHeader("label").getValue());
+ }
+}
diff --git
a/sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/model/LoadResponseTest.java
b/sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/model/LoadResponseTest.java
new file mode 100644
index 00000000000..16a553f5ec4
--- /dev/null
+++
b/sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/model/LoadResponseTest.java
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.load.model;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.*;
+
+public class LoadResponseTest {
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ private static final String SUCCESS_JSON = "{"
+ + "\"TxnId\":1001,"
+ + "\"Label\":\"load_test_20260319_abc123\","
+ + "\"Status\":\"Success\","
+ + "\"TwoPhaseCommit\":\"false\","
+ + "\"Message\":\"\","
+ + "\"NumberTotalRows\":100,"
+ + "\"NumberLoadedRows\":100,"
+ + "\"NumberFilteredRows\":0,"
+ + "\"NumberUnselectedRows\":0,"
+ + "\"LoadBytes\":2048,"
+ + "\"LoadTimeMs\":350,"
+ + "\"BeginTxnTimeMs\":5,"
+ + "\"StreamLoadPutTimeMs\":10,"
+ + "\"ReadDataTimeMs\":200,"
+ + "\"WriteDataTimeMs\":130,"
+ + "\"CommitAndPublishTimeMs\":5,"
+ + "\"ErrorURL\":\"\""
+ + "}";
+
+ @Test
+ public void testDeserializeSuccess() throws Exception {
+ RespContent resp = mapper.readValue(SUCCESS_JSON, RespContent.class);
+ assertEquals(1001L, resp.getTxnId());
+ assertEquals("load_test_20260319_abc123", resp.getLabel());
+ assertEquals("Success", resp.getStatus());
+ assertEquals(100L, resp.getNumberLoadedRows());
+ assertEquals(2048L, resp.getLoadBytes());
+ assertEquals(350, resp.getLoadTimeMs());
+ }
+
+ @Test
+ public void testLoadResponseSuccess() {
+ RespContent resp = new RespContent();
+ resp.setStatus("Success");
+ LoadResponse response = LoadResponse.success(resp);
+ assertEquals(LoadResponse.Status.SUCCESS, response.getStatus());
+ assertNull(response.getErrorMessage());
+ assertNotNull(response.getRespContent());
+ }
+
+ @Test
+ public void testLoadResponseFailure() {
+ RespContent resp = new RespContent();
+ resp.setStatus("Fail");
+ resp.setMessage("table not found");
+ LoadResponse response = LoadResponse.failure(resp, "load failed. cause
by: table not found");
+ assertEquals(LoadResponse.Status.FAILURE, response.getStatus());
+ assertEquals("load failed. cause by: table not found",
response.getErrorMessage());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]