Copilot commented on code in PR #61626:
URL: https://github.com/apache/doris/pull/61626#discussion_r2973588283
##########
sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/config/FormatTest.java:
##########
@@ -0,0 +1,47 @@
+package org.apache.doris.sdk.load.config;
+
+import org.junit.Test;
Review Comment:
This new test file is missing the standard ASF license header comment block
used across Doris Java sources/tests. Add the Apache license header for
consistency/compliance.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/StreamLoader.java:
##########
@@ -0,0 +1,135 @@
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.exception.StreamLoadException;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.apache.doris.sdk.load.model.RespContent;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Executes a single HTTP PUT request and parses the Doris response.
+ * Thread-safe: CloseableHttpClient is shared and reused across calls.
+ *
+ * Uses LaxRedirectStrategy so that 307 redirects on PUT requests are
+ * followed automatically (Doris FE redirects stream load to BE).
+ */
+public class StreamLoader implements AutoCloseable {
+
+ private static final Logger log =
LoggerFactory.getLogger(StreamLoader.class);
+ private static final int SOCKET_TIMEOUT_MS = 9 * 60 * 1000;
+ private static final int CONNECT_TIMEOUT_MS = 60_000;
+
+ private final HttpClientBuilder httpClientBuilder;
+ private final ObjectMapper objectMapper;
+
+ public StreamLoader() {
+ this.httpClientBuilder = buildHttpClient();
+ this.objectMapper = new ObjectMapper();
+ }
+
+ /** Package-private constructor for testing with a mock HTTP client. */
+ StreamLoader(HttpClientBuilder httpClientBuilder) {
+ this.httpClientBuilder = httpClientBuilder;
+ this.objectMapper = new ObjectMapper();
+ }
+
+ /**
+ * Executes the HTTP PUT request and returns a LoadResponse.
+ *
+ * @throws StreamLoadException for retryable HTTP-level errors (non-200
status, connection failure)
+ * @throws IOException for unrecoverable I/O errors
+ */
+ public LoadResponse execute(HttpPut request) throws IOException {
+ log.debug("Sending HTTP PUT to {}", request.getURI());
+ long start = System.currentTimeMillis();
+
+ try (CloseableHttpClient httpClient = httpClientBuilder.build();
+ CloseableHttpResponse response = httpClient.execute(request)) {
+ int statusCode = response.getStatusLine().getStatusCode();
Review Comment:
The class Javadoc says the `CloseableHttpClient` is shared and reused across
calls, but `execute()` builds and closes a new client on every request
(`httpClientBuilder.build()` in try-with-resources). This hurts performance and
also makes `close()` a no-op. Consider constructing a single
`CloseableHttpClient` once (field), reusing it across calls, and closing it in
`close()`, or update the docs if per-request clients are intentional.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/CsvFormat.java:
##########
@@ -0,0 +1,35 @@
+package org.apache.doris.sdk.load.config;
+
+import java.util.HashMap;
Review Comment:
This new source file is missing the standard ASF license header comment
block that appears at the top of other Doris Java files. Add the Apache license
header to align with repository conventions/compliance.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/RequestBuilder.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.config.GroupCommitMode;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Builds HttpPut requests for Doris stream load.
+ * Handles header assembly, label generation, and group commit logic.
+ */
+public class RequestBuilder {
+
+ private static final Logger log =
LoggerFactory.getLogger(RequestBuilder.class);
+ private static final String STREAM_LOAD_PATTERN =
"http://%s/api/%s/%s/_stream_load";
+ private static final Random RANDOM = new Random();
+
+ /**
+ * Builds an HttpPut request for the given config and data.
+ *
+ * @param config DorisConfig
+ * @param data pre-buffered (and optionally pre-compressed) request
body bytes
+ * @param attempt 0 = first attempt, >0 = retry number
+ */
+ public static HttpPut build(DorisConfig config, byte[] data, int attempt)
throws Exception {
+ String host = pickEndpoint(config.getEndpoints());
+ String url = String.format(STREAM_LOAD_PATTERN, host,
config.getDatabase(), config.getTable());
+
+ HttpPut request = new HttpPut(url);
+ request.setEntity(new ByteArrayEntity(data));
+
+ // Basic auth
+ String credentials = config.getUser() + ":" + config.getPassword();
+ String encoded =
Base64.getEncoder().encodeToString(credentials.getBytes(StandardCharsets.UTF_8));
+ request.setHeader("Authorization", "Basic " + encoded);
+ request.setHeader("Expect", "100-continue");
+
+ // Build and apply all stream load headers
+ Map<String, String> allHeaders = buildStreamLoadHeaders(config);
+ for (Map.Entry<String, String> entry : allHeaders.entrySet()) {
+ request.setHeader(entry.getKey(), entry.getValue());
+ }
+
+ // Label handling: skip labels when group commit is enabled
+ boolean groupCommitEnabled = allHeaders.containsKey("group_commit");
+ if (groupCommitEnabled) {
+ if (config.getLabel() != null && !config.getLabel().isEmpty()) {
+ log.warn("Custom label '{}' specified but group_commit is
enabled. Removing label.", config.getLabel());
+ }
+ if (config.getLabelPrefix() != null &&
!config.getLabelPrefix().isEmpty()) {
+ log.warn("Label prefix '{}' specified but group_commit is
enabled. Removing label prefix.", config.getLabelPrefix());
+ }
+ log.info("Group commit enabled - labels removed from request
headers");
Review Comment:
When group commit is enabled you log that labels are removed, but
`buildStreamLoadHeaders()` blindly copies `config.getOptions()` into headers
first. If users pass `options.put("label", ...)`, the label header will still
be sent even when group commit is enabled. Consider explicitly
removing/ignoring the `label` header from options (and warning) when
`group_commit` is set.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/GroupCommitMode.java:
##########
@@ -0,0 +1,14 @@
+package org.apache.doris.sdk.load.config;
+
+/**
Review Comment:
This new source file is missing the standard ASF license header comment
block that appears at the top of other Doris Java files. Add the Apache license
header to align with repository conventions/compliance.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/DorisConfig.java:
##########
@@ -0,0 +1,129 @@
+package org.apache.doris.sdk.load.config;
+
+import java.util.Collections;
Review Comment:
This new source file is missing the standard ASF license header comment
block that appears at the top of other Doris Java files. Add the Apache license
header to align with repository conventions/compliance.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/DorisClient.java:
##########
@@ -0,0 +1,65 @@
+package org.apache.doris.sdk;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
Review Comment:
This new source file is missing the standard ASF license header comment
block that appears at the top of other Doris Java files. Add the Apache license
header to align with repository conventions/compliance.
##########
sdk/java-doris-sdk/pom.xml:
##########
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.doris</groupId>
+ <artifactId>java-doris-sdk</artifactId>
+ <version>1.0.0</version>
+ <packaging>jar</packaging>
+
+ <name>Apache Doris Java SDK</name>
+ <description>A lightweight Apache Doris Stream Load client (Java
version)</description>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <!-- HTTP client -->
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.13</version>
+ </dependency>
+
+ <!-- JSON serialization -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.13.4</version>
+ </dependency>
+
+ <!-- Logging facade -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.9</version>
Review Comment:
This module depends on `slf4j-log4j12`, which pulls in Log4j 1.x
(end-of-life and affected by known security issues). For a new SDK subproject,
prefer a modern backend (e.g., logback-classic or Log4j2 via
`log4j-slf4j-impl`) and keep the main artifact depending only on `slf4j-api`
where possible.
```suggestion
<!-- Logging facade: depend only on SLF4J API, let consumers choose
backend -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
```
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/Format.java:
##########
@@ -0,0 +1,15 @@
+package org.apache.doris.sdk.load.config;
+
+import java.util.Map;
Review Comment:
This new source file is missing the standard ASF license header comment
block that appears at the top of other Doris Java files. Add the Apache license
header to align with repository conventions/compliance.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/exception/StreamLoadException.java:
##########
@@ -0,0 +1,16 @@
+package org.apache.doris.sdk.load.exception;
+
+/**
Review Comment:
This new source file is missing the standard ASF license header comment
block that appears at the top of other Doris Java files. Add the Apache license
header to align with repository conventions/compliance.
##########
sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/DorisLoadClientTest.java:
##########
@@ -0,0 +1,112 @@
+package org.apache.doris.sdk.load;
+
+import org.apache.doris.sdk.load.config.*;
Review Comment:
This new test file is missing the standard ASF license header comment block
used across Doris Java sources/tests. Add the Apache license header for
consistency/compliance.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/DorisLoadClient.java:
##########
@@ -0,0 +1,206 @@
+package org.apache.doris.sdk.load;
+
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.exception.StreamLoadException;
+import org.apache.doris.sdk.load.internal.RequestBuilder;
+import org.apache.doris.sdk.load.internal.StreamLoader;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Thread-safe Doris stream load client.
+ *
+ * <p>Usage:
+ * <pre>
+ * DorisLoadClient client = new DorisLoadClient(config);
+ * LoadResponse resp = client.load(new ByteArrayInputStream(data));
+ * if (resp.getStatus() == LoadResponse.Status.SUCCESS) { ... }
+ * </pre>
+ *
+ * <p>Thread safety: this instance can be shared across threads.
+ * Each {@link #load} call must receive an independent InputStream.
+ */
+public class DorisLoadClient implements AutoCloseable {
+
+ private static final Logger log =
LoggerFactory.getLogger(DorisLoadClient.class);
+ /** Absolute maximum for a single backoff interval: 5 minutes. */
+ private static final long ABSOLUTE_MAX_INTERVAL_MS = 300_000L;
+
+ private final DorisConfig config;
+ private final StreamLoader streamLoader;
+
+ public DorisLoadClient(DorisConfig config) {
+ this.config = config;
+ this.streamLoader = new StreamLoader();
+ }
+
+ /** Package-private constructor for testing with a mock StreamLoader. */
+ DorisLoadClient(DorisConfig config, StreamLoader streamLoader) {
+ this.config = config;
+ this.streamLoader = streamLoader;
+ }
+
+ /**
+ * Loads data from the given InputStream into Doris via stream load.
+ * The InputStream is fully consumed and buffered before the first attempt.
+ * Retries with exponential backoff on retryable errors (network/HTTP
failures).
+ * Business failures (bad data, schema mismatch, auth) are returned
immediately without retry.
+ *
+ * @param inputStream data to load (consumed once; must not be shared
across threads)
+ * @return LoadResponse with status SUCCESS or FAILURE
+ * @throws IOException if the stream cannot be read or all retries are
exhausted
+ */
+ public LoadResponse load(InputStream inputStream) throws IOException {
+ int maxRetries = 6;
+ long baseIntervalMs = 1000L;
+ long maxTotalTimeMs = 60000L;
+
+ if (config.getRetry() != null) {
+ maxRetries = config.getRetry().getMaxRetryTimes();
+ baseIntervalMs = config.getRetry().getBaseIntervalMs();
+ maxTotalTimeMs = config.getRetry().getMaxTotalTimeMs();
+ }
+
+ log.info("Starting stream load: {}.{}", config.getDatabase(),
config.getTable());
+
+ // Buffer the InputStream once so retries can replay the body
+ byte[] bodyData = readAll(inputStream);
+
+ // Compress once before the retry loop (avoids re-compressing on each
retry)
+ if (config.isEnableGzip()) {
+ bodyData = gzipCompress(bodyData);
+ }
+ Exception lastException = null;
+ LoadResponse lastResponse = null;
+ long totalRetryTimeMs = 0L;
+ long operationStart = System.currentTimeMillis();
+
+ for (int attempt = 0; attempt <= maxRetries; attempt++) {
+ if (attempt > 0) {
+ log.info("Retry attempt {}/{}", attempt, maxRetries);
+ long backoff = calculateBackoffMs(attempt, baseIntervalMs,
maxTotalTimeMs, totalRetryTimeMs);
+
+ if (maxTotalTimeMs > 0 && totalRetryTimeMs + backoff >
maxTotalTimeMs) {
+ log.warn("Next retry backoff ({}ms) would exceed total
limit ({}ms). Stopping.", backoff, maxTotalTimeMs);
+ break;
+ }
+
+ log.info("Waiting {}ms before retry (total retry time so far:
{}ms)", backoff, totalRetryTimeMs);
+ sleep(backoff);
+ totalRetryTimeMs += backoff;
+ } else {
+ log.info("Initial load attempt");
+ }
+
+ try {
+ HttpPut request = RequestBuilder.build(config, bodyData,
attempt);
+ lastResponse = streamLoader.execute(request);
+
+ if (lastResponse.getStatus() == LoadResponse.Status.SUCCESS) {
+ log.info("Stream load succeeded on attempt {}", attempt +
1);
+ return lastResponse;
+ }
+
+ // Business failure (bad data, schema mismatch, auth) — do not
retry
+ log.error("Load failed (non-retryable): {}",
lastResponse.getErrorMessage());
+ return lastResponse;
+
+ } catch (StreamLoadException e) {
+ // Retryable: network error, HTTP 5xx, etc.
+ lastException = e;
+ log.error("Attempt {} failed with retryable error: ", attempt
+ 1, e);
+ } catch (Exception e) {
+ // Wrap unexpected exceptions as retryable
+ lastException = new StreamLoadException("Unexpected error
building request: " + e.getMessage(), e);
+ log.error("Attempt {} failed with unexpected error: {}",
attempt + 1, e.getMessage());
+
+ if (attempt == maxRetries) {
+ log.warn("Reached maximum retry attempts ({}), stopping.",
maxRetries);
+ break;
+ }
+
+ // Check elapsed wall-clock time
+ long elapsed = System.currentTimeMillis() - operationStart;
+ if (maxTotalTimeMs > 0 && elapsed > maxTotalTimeMs) {
+ log.warn("Total elapsed time ({}ms) exceeded limit ({}ms),
stopping retries.", elapsed, maxTotalTimeMs);
+ break;
+ }
+ }
Review Comment:
Retry time limiting is inconsistent: the wall-clock `elapsed >
maxTotalTimeMs` check is only applied in the generic `Exception` catch, not in
the `StreamLoadException` (retryable) path. With repeated network/HTTP
failures, retries can continue well beyond `maxTotalTimeMs`, and
`calculateBackoffMs()` can also return 0 which may cause a tight retry loop.
Apply the same elapsed/remaining-time stop condition for `StreamLoadException`
retries (and stop when computed backoff is 0 due to no remaining time).
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/StreamLoader.java:
##########
@@ -0,0 +1,135 @@
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.exception.StreamLoadException;
Review Comment:
This new source file is missing the standard ASF license header comment
block that appears at the top of other Doris Java files. Add the Apache license
header to align with repository conventions/compliance.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/RequestBuilder.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.config.GroupCommitMode;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Builds HttpPut requests for Doris stream load.
+ * Handles header assembly, label generation, and group commit logic.
+ */
+public class RequestBuilder {
+
+ private static final Logger log =
LoggerFactory.getLogger(RequestBuilder.class);
+ private static final String STREAM_LOAD_PATTERN =
"http://%s/api/%s/%s/_stream_load";
+ private static final Random RANDOM = new Random();
+
+ /**
+ * Builds an HttpPut request for the given config and data.
+ *
+ * @param config DorisConfig
+ * @param data pre-buffered (and optionally pre-compressed) request
body bytes
+ * @param attempt 0 = first attempt, >0 = retry number
+ */
+ public static HttpPut build(DorisConfig config, byte[] data, int attempt)
throws Exception {
+ String host = pickEndpoint(config.getEndpoints());
+ String url = String.format(STREAM_LOAD_PATTERN, host,
config.getDatabase(), config.getTable());
+
Review Comment:
`STREAM_LOAD_PATTERN` hard-codes the `http://` scheme, while
`pickEndpoint()` strips both `http://` and `https://`. As a result, configuring
an `https://...` endpoint will still produce an `http://...` request URL.
Preserve the original endpoint scheme when building the URI (e.g., keep full
endpoint as base URL and append `/api/<db>/<table>/_stream_load`).
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/SimpleConfigExample.java:
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.examples;
+
+import org.apache.doris.sdk.DorisClient;
+import org.apache.doris.sdk.load.DorisLoadClient;
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.config.GroupCommitMode;
+import org.apache.doris.sdk.load.model.LoadResponse;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple configuration example — basic JSON Lines load with Group Commit
ASYNC.
+ * Mirrors Go SDK's SimpleConfigExample.
+ */
+public class SimpleConfigExample {
+
+ public static void run() {
+ Map<String, String> options = new HashMap<>();
+ options.put("strict_mode", "true");
+ options.put("max_filter_ratio", "0.1");
+
+ DorisConfig config = DorisConfig.builder()
+ .endpoints(Arrays.asList("http://10.16.10.6:38535"))
Review Comment:
This example hard-codes a private/internal endpoint IP
(`http://10.16.10.6:38535`). In an Apache repo, examples should use
placeholders (e.g., `http://127.0.0.1:8030` or `http://fe-host:8030`) to avoid
leaking internal network details and to make the sample copy/paste friendly.
```suggestion
.endpoints(Arrays.asList("http://127.0.0.1:8030"))
```
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/model/RespContent.java:
##########
@@ -0,0 +1,75 @@
+package org.apache.doris.sdk.load.model;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
Review Comment:
This new source file is missing the standard ASF license header comment
block that appears at the top of other Doris Java files. Add the Apache license
header to align with repository conventions/compliance.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/JsonExample.java:
##########
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.examples;
+
+import org.apache.doris.sdk.DorisClient;
+import org.apache.doris.sdk.load.DorisLoadClient;
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.config.GroupCommitMode;
+import org.apache.doris.sdk.load.config.JsonFormat;
+import org.apache.doris.sdk.load.config.RetryConfig;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+/**
+ * Production-level JSON data loading demo.
+ * Loads 50,000 JSON Lines order records in a single stream load call.
+ * Mirrors Go SDK's RunJSONExample.
+ */
+public class JsonExample {
+
+ private static final Logger log =
LoggerFactory.getLogger(JsonExample.class);
+ private static final int JSON_BATCH_SIZE = 50_000;
+
+ public static void run() {
+ System.out.println("=== Production-Level JSON Data Loading Demo ===");
+ log.info("Starting JSON loading demo with {} order records",
JSON_BATCH_SIZE);
+
+ DorisConfig config = DorisConfig.builder()
+ .endpoints(Arrays.asList("http://10.16.10.6:38535"))
Review Comment:
This example hard-codes a private/internal endpoint IP
(`http://10.16.10.6:38535`). In an Apache repo, examples should use
placeholders (e.g., `http://127.0.0.1:8030` or `http://fe-host:8030`) to avoid
leaking internal network details and to make the sample copy/paste friendly.
```suggestion
.endpoints(Arrays.asList("http://127.0.0.1:8030"))
```
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/RetryConfig.java:
##########
@@ -0,0 +1,49 @@
+package org.apache.doris.sdk.load.config;
+
+/**
Review Comment:
This new source file is missing the standard ASF license header comment
block that appears at the top of other Doris Java files. Add the Apache license
header to align with repository conventions/compliance.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/GzipExample.java:
##########
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.examples;
+
+import org.apache.doris.sdk.DorisClient;
+import org.apache.doris.sdk.load.DorisLoadClient;
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.config.GroupCommitMode;
+import org.apache.doris.sdk.load.model.LoadResponse;
+
+import java.util.Arrays;
+
+/**
+ * Gzip compression example.
+ * The SDK compresses the request body transparently before sending.
+ * No need to pre-compress the data — just set enableGzip(true).
+ * Mirrors Go SDK's GzipExample.
+ */
+public class GzipExample {
+
+ public static void run() {
+ DorisConfig config = DorisConfig.builder()
+ .endpoints(Arrays.asList("http://10.16.10.6:38535"))
Review Comment:
This example hard-codes a private/internal endpoint IP
(`http://10.16.10.6:38535`). In an Apache repo, examples should use
placeholders (e.g., `http://127.0.0.1:8030` or `http://fe-host:8030`) to avoid
leaking internal network details and to make the sample copy/paste friendly.
```suggestion
.endpoints(Arrays.asList("http://127.0.0.1:8030"))
```
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/StreamLoader.java:
##########
@@ -0,0 +1,135 @@
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.exception.StreamLoadException;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.apache.doris.sdk.load.model.RespContent;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Executes a single HTTP PUT request and parses the Doris response.
+ * Thread-safe: CloseableHttpClient is shared and reused across calls.
+ *
+ * Uses LaxRedirectStrategy so that 307 redirects on PUT requests are
+ * followed automatically (Doris FE redirects stream load to BE).
+ */
+public class StreamLoader implements AutoCloseable {
+
+ private static final Logger log =
LoggerFactory.getLogger(StreamLoader.class);
+ private static final int SOCKET_TIMEOUT_MS = 9 * 60 * 1000;
+ private static final int CONNECT_TIMEOUT_MS = 60_000;
+
+ private final HttpClientBuilder httpClientBuilder;
+ private final ObjectMapper objectMapper;
+
+ public StreamLoader() {
+ this.httpClientBuilder = buildHttpClient();
+ this.objectMapper = new ObjectMapper();
+ }
+
+ /** Package-private constructor for testing with a mock HTTP client. */
+ StreamLoader(HttpClientBuilder httpClientBuilder) {
+ this.httpClientBuilder = httpClientBuilder;
+ this.objectMapper = new ObjectMapper();
+ }
+
+ /**
+ * Executes the HTTP PUT request and returns a LoadResponse.
+ *
+ * @throws StreamLoadException for retryable HTTP-level errors (non-200
status, connection failure)
+ * @throws IOException for unrecoverable I/O errors
+ */
+ public LoadResponse execute(HttpPut request) throws IOException {
+ log.debug("Sending HTTP PUT to {}", request.getURI());
+ long start = System.currentTimeMillis();
+
+ try (CloseableHttpClient httpClient = httpClientBuilder.build();
+ CloseableHttpResponse response = httpClient.execute(request)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ log.debug("HTTP response status: {}", statusCode);
+ log.debug("HTTP request completed in {} ms",
System.currentTimeMillis() - start);
+
+ if (statusCode == 200) {
+ return parseResponse(response);
+ } else {
+ // Non-200 is retryable (e.g. 503, 429)
+ throw new StreamLoadException("stream load error: " +
response.getStatusLine().toString());
+ }
+ } catch (StreamLoadException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new StreamLoadException("stream load request failed: " +
e.getMessage(), e);
+ }
+ }
+
+ private LoadResponse parseResponse(CloseableHttpResponse response) throws
IOException {
+ byte[] bodyBytes = EntityUtils.toByteArray(response.getEntity());
+ String body = new String(bodyBytes, StandardCharsets.UTF_8);
+ log.info("Stream Load Response: {}", body);
+
+ RespContent resp = objectMapper.readValue(body, RespContent.class);
+
+ if (isSuccess(resp.getStatus())) {
+ log.info("Load operation completed successfully");
+ return LoadResponse.success(resp);
+ } else {
+ log.error("Load operation failed with status: {}",
resp.getStatus());
+ String errorMsg;
+ if (resp.getMessage() != null && !resp.getMessage().isEmpty()) {
+ errorMsg = "load failed. cause by: " + resp.getMessage()
+ + ", please check more detail from url: " +
resp.getErrorUrl();
+ } else {
+ errorMsg = body;
+ }
+ return LoadResponse.failure(resp, errorMsg);
+ }
+ }
+
+ private static boolean isSuccess(String status) {
+ return "success".equalsIgnoreCase(status);
+ }
+
+ private static HttpClientBuilder buildHttpClient() {
+ try {
+ RequestConfig requestConfig = RequestConfig.custom()
+ .setConnectTimeout(CONNECT_TIMEOUT_MS)
+ .setSocketTimeout(SOCKET_TIMEOUT_MS)
+ .setConnectionRequestTimeout(CONNECT_TIMEOUT_MS)
+ .build();
+
+ return HttpClientBuilder.create()
+ .setDefaultRequestConfig(requestConfig)
+ .setRedirectStrategy(new DefaultRedirectStrategy() {
+ @Override
+ protected boolean isRedirectable(String method) {
+ return true;
+ }
+ })
+ .setSSLSocketFactory(
+ new SSLConnectionSocketFactory(
+ SSLContextBuilder.create()
+ .loadTrustMaterial(null, (chain,
authType) -> true)
+ .build(),
+ NoopHostnameVerifier.INSTANCE));
Review Comment:
The HTTP client is configured to trust all TLS certificates and disable
hostname verification (`loadTrustMaterial(... -> true)` +
`NoopHostnameVerifier`). This makes HTTPS connections vulnerable to MITM and
should not be the default in a production SDK. Use the default
SSLContext/hostname verifier (or make an explicit, opt-in 'insecureSkipVerify'
config only for testing).
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/DorisLoadClient.java:
##########
@@ -0,0 +1,206 @@
+package org.apache.doris.sdk.load;
+
+import org.apache.doris.sdk.load.config.DorisConfig;
Review Comment:
This new source file is missing the standard ASF license header comment
block that appears at the top of other Doris Java files. Add the Apache license
header to align with repository conventions/compliance.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/ConcurrentExample.java:
##########
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.examples;
+
+import org.apache.doris.sdk.DorisClient;
+import org.apache.doris.sdk.load.DorisLoadClient;
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.config.GroupCommitMode;
+import org.apache.doris.sdk.load.config.RetryConfig;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Production-level concurrent large-scale data loading demo.
+ * Loads 1,000,000 records using 10 concurrent threads, each loading 100,000
records.
+ *
+ * <p>Thread safety: DorisLoadClient is shared across all workers.
+ * Each worker uses its own independent InputStream — never share streams
across threads.
+ *
+ * Mirrors Go SDK's RunConcurrentExample.
+ */
+public class ConcurrentExample {
+
+ private static final Logger log =
LoggerFactory.getLogger(ConcurrentExample.class);
+
+ private static final int TOTAL_RECORDS = 1_000_000;
+ private static final int NUM_WORKERS = 10;
+ private static final int RECORDS_PER_WORKER = TOTAL_RECORDS / NUM_WORKERS;
+
+ /** Per-worker result. */
+ static class WorkerResult {
+ final int workerId;
+ final boolean success;
+ final int recordsLoaded;
+ final long dataSizeBytes;
+ final long loadTimeMs;
+ final String error;
+
+ WorkerResult(int workerId, boolean success, int recordsLoaded,
+ long dataSizeBytes, long loadTimeMs, String error) {
+ this.workerId = workerId;
+ this.success = success;
+ this.recordsLoaded = recordsLoaded;
+ this.dataSizeBytes = dataSizeBytes;
+ this.loadTimeMs = loadTimeMs;
+ this.error = error;
+ }
+ }
+
+ public static void run() {
+ System.out.println("=== Production-Level Concurrent Large-Scale
Loading Demo ===");
+ System.out.printf("Scale: %d total records, %d workers, %d records per
worker%n",
+ TOTAL_RECORDS, NUM_WORKERS, RECORDS_PER_WORKER);
+
+ // Single shared client — thread-safe
+ DorisConfig config = DorisConfig.builder()
+ .endpoints(Arrays.asList("http://10.16.10.6:38535"))
Review Comment:
This example hard-codes a private/internal endpoint IP
(`http://10.16.10.6:38535`). In an Apache repo, examples should use
placeholders (e.g., `http://127.0.0.1:8030` or `http://fe-host:8030`) to avoid
leaking internal network details and to make the sample copy/paste friendly.
```suggestion
.endpoints(Arrays.asList("http://127.0.0.1:8030"))
```
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/config/JsonFormat.java:
##########
@@ -0,0 +1,46 @@
+package org.apache.doris.sdk.load.config;
+
+import java.util.HashMap;
Review Comment:
This new source file is missing the standard ASF license header comment
block that appears at the top of other Doris Java files. Add the Apache license
header to align with repository conventions/compliance.
##########
sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/internal/RequestBuilderTest.java:
##########
@@ -0,0 +1,96 @@
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.config.*;
Review Comment:
This new test file is missing the standard ASF license header comment block
used across Doris Java sources/tests. Add the Apache license header for
consistency/compliance.
##########
sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/config/DorisConfigTest.java:
##########
@@ -0,0 +1,83 @@
+package org.apache.doris.sdk.load.config;
+
+import org.junit.Test;
Review Comment:
This new test file is missing the standard ASF license header comment block
used across Doris Java sources/tests. Add the Apache license header for
consistency/compliance.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/SingleBatchExample.java:
##########
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.sdk.examples;
+
+import org.apache.doris.sdk.DorisClient;
+import org.apache.doris.sdk.load.DorisLoadClient;
+import org.apache.doris.sdk.load.config.DorisConfig;
+import org.apache.doris.sdk.load.config.GroupCommitMode;
+import org.apache.doris.sdk.load.config.RetryConfig;
+import org.apache.doris.sdk.load.model.LoadResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+/**
+ * Production-level single-threaded large batch loading demo.
+ * Loads 100,000 CSV order records in a single stream load call.
+ * Mirrors Go SDK's RunSingleBatchExample.
+ */
+public class SingleBatchExample {
+
+ private static final Logger log =
LoggerFactory.getLogger(SingleBatchExample.class);
+ private static final int BATCH_SIZE = 100_000;
+
+ public static void run() {
+ System.out.println("=== Production-Level Large Batch Loading Demo
===");
+ log.info("Starting large batch loading demo with {} order records",
BATCH_SIZE);
+
+ DorisConfig config = DorisConfig.builder()
+ .endpoints(Arrays.asList("http://10.16.10.6:38535"))
Review Comment:
This example hard-codes a private/internal endpoint IP
(`http://10.16.10.6:38535`). In an Apache repo, examples should use
placeholders (e.g., `http://127.0.0.1:8030` or `http://fe-host:8030`) to avoid
leaking internal network details and to make the sample copy/paste friendly.
```suggestion
.endpoints(Arrays.asList("http://127.0.0.1:8030"))
```
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/RequestBuilder.java:
##########
@@ -0,0 +1,144 @@
+package org.apache.doris.sdk.load.internal;
+
+import org.apache.doris.sdk.load.config.DorisConfig;
Review Comment:
This new source file is missing the standard ASF license header comment
block that appears at the top of other Doris Java files. Add the Apache license
header to align with repository conventions/compliance.
##########
sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/model/LoadResponse.java:
##########
@@ -0,0 +1,39 @@
+package org.apache.doris.sdk.load.model;
+
+/**
Review Comment:
This new source file is missing the standard ASF license header comment
block that appears at the top of other Doris Java files. Add the Apache license
header to align with repository conventions/compliance.
##########
sdk/java-doris-sdk/src/test/java/org/apache/doris/sdk/load/model/LoadResponseTest.java:
##########
@@ -0,0 +1,61 @@
+package org.apache.doris.sdk.load.model;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
Review Comment:
This new test file is missing the standard ASF license header comment block
used across Doris Java sources/tests. Add the Apache license header for
consistency/compliance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]