Copilot commented on code in PR #61626: URL: https://github.com/apache/doris/pull/61626#discussion_r2973885313
########## sdk/java-doris-sdk/src/main/resources/log4j.properties: ########## @@ -0,0 +1,23 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, STDOUT + +log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender +log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout +log4j.appender.STDOUT.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c - %m%n Review Comment: Shipping a `log4j.properties` in `src/main/resources` means the SDK jar can override an application's logging configuration at runtime. Libraries typically should not include logging config; move this to `src/test/resources` or the examples module, and let applications choose/configure their own logging backend. ```suggestion ################################################################################ ``` ########## sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/DorisLoadClient.java: ########## @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.sdk.load; + +import org.apache.doris.sdk.load.config.DorisConfig; +import org.apache.doris.sdk.load.exception.StreamLoadException; +import org.apache.doris.sdk.load.internal.RequestBuilder; +import org.apache.doris.sdk.load.internal.StreamLoader; +import org.apache.doris.sdk.load.model.LoadResponse; +import org.apache.http.client.methods.HttpPut; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPOutputStream; + +/** + * Thread-safe Doris stream load client. + * + * <p>Usage: + * <pre> + * DorisLoadClient client = new DorisLoadClient(config); + * LoadResponse resp = client.load(new ByteArrayInputStream(data)); + * if (resp.getStatus() == LoadResponse.Status.SUCCESS) { ... } + * </pre> + * + * <p>Thread safety: this instance can be shared across threads. + * Each {@link #load} call must receive an independent InputStream. + */ +public class DorisLoadClient implements AutoCloseable { + + private static final Logger log = LoggerFactory.getLogger(DorisLoadClient.class); + /** Absolute maximum for a single backoff interval: 5 minutes. */ + private static final long ABSOLUTE_MAX_INTERVAL_MS = 300_000L; + + private final DorisConfig config; + private final StreamLoader streamLoader; + + public DorisLoadClient(DorisConfig config) { + this.config = config; + this.streamLoader = new StreamLoader(); + } + + /** Package-private constructor for testing with a mock StreamLoader. */ + DorisLoadClient(DorisConfig config, StreamLoader streamLoader) { + this.config = config; + this.streamLoader = streamLoader; + } + + /** + * Loads data from the given InputStream into Doris via stream load. + * The InputStream is fully consumed and buffered before the first attempt. + * Retries with exponential backoff on retryable errors (network/HTTP failures). + * Business failures (bad data, schema mismatch, auth) are returned immediately without retry. + * + * @param inputStream data to load (consumed once; must not be shared across threads) + * @return LoadResponse with status SUCCESS or FAILURE + * @throws IOException if the stream cannot be read or all retries are exhausted + */ + public LoadResponse load(InputStream inputStream) throws IOException { + int maxRetries = 6; + long baseIntervalMs = 1000L; + long maxTotalTimeMs = 60000L; + + if (config.getRetry() != null) { + maxRetries = config.getRetry().getMaxRetryTimes(); + baseIntervalMs = config.getRetry().getBaseIntervalMs(); + maxTotalTimeMs = config.getRetry().getMaxTotalTimeMs(); + } + + log.info("Starting stream load: {}.{}", config.getDatabase(), config.getTable()); + + // Buffer the InputStream once so retries can replay the body + byte[] bodyData = readAll(inputStream); + Review Comment: `load(InputStream)` fully consumes the provided stream but does not close it, and the Javadoc doesn’t state who owns/should close the stream. For non-memory streams (e.g., `FileInputStream`) this can leak resources. Either document that the caller must close the stream, or wrap the read in a try-with-resources and take ownership explicitly. ########## sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/DataGenerator.java: ########## @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.sdk.examples; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Random; + +/** + * Unified data generation utilities for all Doris Stream Load examples. + * All examples use a unified Order schema for consistency. + * + * <p>Table DDL (orders): + * <pre> + * CREATE TABLE orders ( + * order_id BIGINT, + * customer_id BIGINT, + * product_name VARCHAR(200), + * category VARCHAR(50), + * brand VARCHAR(50), + * quantity INT, + * unit_price DECIMAL(10,2), + * total_amount DECIMAL(10,2), + * status VARCHAR(20), + * order_date DATETIME, + * region VARCHAR(20) + * ) DISTRIBUTED BY HASH(order_id) BUCKETS 10 + * PROPERTIES ( + * "replication_allocation" = "tag.location.default: 1", Review Comment: The DDL snippet in the Javadoc has a trailing comma after the only property entry, which makes the SQL invalid as written. Please remove the extra comma (or add another property) so users can copy/paste the example DDL successfully. ```suggestion * "replication_allocation" = "tag.location.default: 1" ``` ########## sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/examples/DataGenerator.java: ########## @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.sdk.examples; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Random; + +/** + * Unified data generation utilities for all Doris Stream Load examples. + * All examples use a unified Order schema for consistency. + * + * <p>Table DDL (orders): + * <pre> + * CREATE TABLE orders ( + * order_id BIGINT, + * customer_id BIGINT, + * product_name VARCHAR(200), + * category VARCHAR(50), + * brand VARCHAR(50), + * quantity INT, + * unit_price DECIMAL(10,2), + * total_amount DECIMAL(10,2), + * status VARCHAR(20), + * order_date DATETIME, + * region VARCHAR(20) + * ) DISTRIBUTED BY HASH(order_id) BUCKETS 10 + * PROPERTIES ( + * "replication_allocation" = "tag.location.default: 1", + * ) + * </pre> + */ +public class DataGenerator { + + private static final String[] CATEGORIES = { + "Electronics", "Clothing", "Books", "Home", "Sports", + "Beauty", "Automotive", "Food", "Health", "Toys" + }; + + private static final String[] BRANDS = { + "Apple", "Samsung", "Nike", "Adidas", "Sony", + "LG", "Canon", "Dell", "HP", "Xiaomi", "Huawei", "Lenovo" + }; + + private static final String[] STATUSES = { + "active", "inactive", "pending", "discontinued", "completed", "cancelled" + }; + + private static final String[] REGIONS = {"North", "South", "East", "West", "Central"}; + + /** + * Generates order data in CSV format. + * + * @param workerID worker ID (0 for single-threaded), used to offset order IDs + * @param batchSize number of records to generate + * @return CSV string (no header row) + */ + public static String generateOrderCSV(int workerID, int batchSize) { + long seed = System.nanoTime() + (long) workerID * 1000; + Random rng = new Random(seed); + int baseOrderID = workerID * batchSize; + + StringBuilder sb = new StringBuilder(batchSize * 200); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + for (int i = 1; i <= batchSize; i++) { + int quantity = rng.nextInt(10) + 1; + double unitPrice = (rng.nextInt(50000) + 1) / 100.0; + double totalAmount = quantity * unitPrice; + String productName = "Product_" + BRANDS[rng.nextInt(BRANDS.length)] + "_" + rng.nextInt(1000); + String category = CATEGORIES[rng.nextInt(CATEGORIES.length)]; + String brand = BRANDS[rng.nextInt(BRANDS.length)]; + String status = STATUSES[rng.nextInt(STATUSES.length)]; + String region = REGIONS[rng.nextInt(REGIONS.length)]; + long offsetMs = (long) rng.nextInt(365 * 24 * 3600) * 1000L; + String orderDate = sdf.format(new Date(System.currentTimeMillis() - offsetMs)); + + sb.append(baseOrderID + i).append(",") + .append(rng.nextInt(100000) + 1).append(",") + .append("\"").append(productName).append("\"").append(",") + .append(category).append(",") + .append(brand).append(",") + .append(quantity).append(",") + .append(String.format("%.2f", unitPrice)).append(",") + .append(String.format("%.2f", totalAmount)).append(",") + .append(status).append(",") + .append(orderDate).append(",") + .append(region).append("\n"); + } + return sb.toString(); + } + + /** + * Generates order data in JSON Lines format (one JSON object per line). + * + * @param workerID worker ID (0 for single-threaded) + * @param batchSize number of records to generate + * @return JSON Lines string + */ + public static String generateOrderJSON(int workerID, int batchSize) { + long seed = System.nanoTime() + (long) workerID * 1000; + Random rng = new Random(seed); + int baseOrderID = workerID * batchSize; + + StringBuilder sb = new StringBuilder(batchSize * 300); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + + for (int i = 1; i <= batchSize; i++) { + int quantity = rng.nextInt(10) + 1; + double unitPrice = (rng.nextInt(50000) + 1) / 100.0; + double totalAmount = quantity * unitPrice; + String productName = "Product_" + BRANDS[rng.nextInt(BRANDS.length)] + "_" + rng.nextInt(1000); + String category = CATEGORIES[rng.nextInt(CATEGORIES.length)]; + String brand = BRANDS[rng.nextInt(BRANDS.length)]; + String status = STATUSES[rng.nextInt(STATUSES.length)]; + String region = REGIONS[rng.nextInt(REGIONS.length)]; + long offsetMs = (long) rng.nextInt(365 * 24 * 3600) * 1000L; + String orderDate = sdf.format(new Date(System.currentTimeMillis() - offsetMs)); Review Comment: `generateOrderJSON()` formats timestamps with a literal `'Z'` suffix but does not set the formatter timezone to UTC, so the produced strings look like UTC while actually representing local time. Either set the `SimpleDateFormat` timezone to UTC, or remove the `'Z'` suffix / use a local-time format to avoid misleading example data. ########## sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/StreamLoader.java: ########## @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.sdk.load.internal; + +import org.apache.doris.sdk.load.exception.StreamLoadException; +import org.apache.doris.sdk.load.model.LoadResponse; +import org.apache.doris.sdk.load.model.RespContent; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +/** + * Uses RedirectStrategy so that 307 redirects on PUT requests are + * followed automatically (Doris FE redirects stream load to BE). + */ +public class StreamLoader implements AutoCloseable { + + private static final Logger log = LoggerFactory.getLogger(StreamLoader.class); + private static final int SOCKET_TIMEOUT_MS = 9 * 60 * 1000; + private static final int CONNECT_TIMEOUT_MS = 60_000; + + private final HttpClientBuilder httpClientBuilder; + private final ObjectMapper objectMapper; + + public StreamLoader() { + this.httpClientBuilder = buildHttpClient(); + this.objectMapper = new ObjectMapper(); + } + + /** Package-private constructor for testing with a mock HTTP client. */ + StreamLoader(HttpClientBuilder httpClientBuilder) { + this.httpClientBuilder = httpClientBuilder; + this.objectMapper = new ObjectMapper(); + } + + /** + * Executes the HTTP PUT request and returns a LoadResponse. + * + * @throws StreamLoadException for retryable HTTP-level errors (non-200 status, connection failure) + * @throws IOException for unrecoverable I/O errors + */ + public LoadResponse execute(HttpPut request) throws IOException { Review Comment: The Javadoc says this method can throw `IOException` for unrecoverable I/O errors, but the implementation wraps all `IOException` into `StreamLoadException` (runtime). Either adjust the Javadoc/signature to reflect actual behavior, or allow non-retryable I/O errors (e.g., JSON parse) to propagate as `IOException`. ```suggestion * @throws StreamLoadException for HTTP-level errors (non-200 status, connection failure) * and unrecoverable I/O errors (including JSON parse failures) */ public LoadResponse execute(HttpPut request) { ``` ########## sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/StreamLoader.java: ########## @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.sdk.load.internal; + +import org.apache.doris.sdk.load.exception.StreamLoadException; +import org.apache.doris.sdk.load.model.LoadResponse; +import org.apache.doris.sdk.load.model.RespContent; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +/** + * Uses RedirectStrategy so that 307 redirects on PUT requests are + * followed automatically (Doris FE redirects stream load to BE). + */ +public class StreamLoader implements AutoCloseable { + + private static final Logger log = LoggerFactory.getLogger(StreamLoader.class); + private static final int SOCKET_TIMEOUT_MS = 9 * 60 * 1000; + private static final int CONNECT_TIMEOUT_MS = 60_000; + + private final HttpClientBuilder httpClientBuilder; + private final ObjectMapper objectMapper; + + public StreamLoader() { + this.httpClientBuilder = buildHttpClient(); + this.objectMapper = new ObjectMapper(); + } + + /** Package-private constructor for testing with a mock HTTP client. */ + StreamLoader(HttpClientBuilder httpClientBuilder) { + this.httpClientBuilder = httpClientBuilder; + this.objectMapper = new ObjectMapper(); + } + + /** + * Executes the HTTP PUT request and returns a LoadResponse. + * + * @throws StreamLoadException for retryable HTTP-level errors (non-200 status, connection failure) + * @throws IOException for unrecoverable I/O errors + */ + public LoadResponse execute(HttpPut request) throws IOException { + log.debug("Sending HTTP PUT to {}", request.getURI()); + long start = System.currentTimeMillis(); + + try (CloseableHttpClient httpClient = httpClientBuilder.build(); + CloseableHttpResponse response = httpClient.execute(request)) { + int statusCode = response.getStatusLine().getStatusCode(); Review Comment: `execute()` builds and closes a new `CloseableHttpClient` for every request. This prevents connection pooling/keep-alive reuse and adds significant overhead under concurrency. Consider keeping a single `CloseableHttpClient` instance in `StreamLoader` (built once from the builder) and closing it in `close()`. ########## sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/RequestBuilder.java: ########## @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.sdk.load.internal; + +import org.apache.doris.sdk.load.config.DorisConfig; +import org.apache.doris.sdk.load.config.GroupCommitMode; Review Comment: `GroupCommitMode` is imported but never referenced by name (the `switch` cases resolve from the expression type). This creates an unused import warning and can fail builds that enable unused-import checks. Please remove the unused import. ```suggestion ``` ########## sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/RequestBuilder.java: ########## @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.sdk.load.internal; + +import org.apache.doris.sdk.load.config.DorisConfig; +import org.apache.doris.sdk.load.config.GroupCommitMode; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +/** + * Builds HttpPut requests for Doris stream load. + * Handles header assembly, label generation, and group commit logic. + */ +public class RequestBuilder { + + private static final Logger log = LoggerFactory.getLogger(RequestBuilder.class); + private static final String STREAM_LOAD_PATTERN = "http://%s/api/%s/%s/_stream_load"; + private static final Random RANDOM = new Random(); + + /** + * Builds an HttpPut request for the given config and data. + * + * @param config DorisConfig + * @param data pre-buffered (and optionally pre-compressed) request body bytes + * @param attempt 0 = first attempt, >0 = retry number + */ + public static HttpPut build(DorisConfig config, byte[] data, int attempt) throws Exception { + String host = pickEndpoint(config.getEndpoints()); + String url = String.format(STREAM_LOAD_PATTERN, host, config.getDatabase(), config.getTable()); + Review Comment: `STREAM_LOAD_PATTERN` hard-codes the `http://` scheme, but `pickEndpoint()` strips both `http://` and `https://`. If a user configures `https://...` endpoints, requests will still be sent over plain HTTP. Consider preserving the endpoint scheme (or parsing the endpoint as a URL and reusing its scheme) when building the stream load URL. ########## sdk/java-doris-sdk/pom.xml: ########## @@ -0,0 +1,75 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.doris</groupId> + <artifactId>java-doris-sdk</artifactId> + <version>1.0.0</version> + <packaging>jar</packaging> + + <name>Apache Doris Java SDK</name> + <description>A lightweight Apache Doris Stream Load client (Java version)</description> + + <properties> + <maven.compiler.source>8</maven.compiler.source> + <maven.compiler.target>8</maven.compiler.target> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <dependencies> + <!-- HTTP client --> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.13</version> + </dependency> + + <!-- JSON serialization --> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.13.5</version> + </dependency> + + <!-- Logging facade --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.9</version> + </dependency> Review Comment: This module depends on `slf4j-log4j12` (Log4j 1.x binding) as a main dependency, which (1) forces a logging backend on all SDK consumers and (2) pulls in the EOL Log4j 1.x stack with known security issues. Prefer depending on `slf4j-api` only, and add a backend (e.g., `slf4j-simple`/logback) only for tests/examples. ########## sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/DorisLoadClient.java: ########## @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.sdk.load; + +import org.apache.doris.sdk.load.config.DorisConfig; +import org.apache.doris.sdk.load.exception.StreamLoadException; +import org.apache.doris.sdk.load.internal.RequestBuilder; +import org.apache.doris.sdk.load.internal.StreamLoader; +import org.apache.doris.sdk.load.model.LoadResponse; +import org.apache.http.client.methods.HttpPut; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPOutputStream; + +/** + * Thread-safe Doris stream load client. + * + * <p>Usage: + * <pre> + * DorisLoadClient client = new DorisLoadClient(config); + * LoadResponse resp = client.load(new ByteArrayInputStream(data)); + * if (resp.getStatus() == LoadResponse.Status.SUCCESS) { ... } + * </pre> + * + * <p>Thread safety: this instance can be shared across threads. + * Each {@link #load} call must receive an independent InputStream. + */ +public class DorisLoadClient implements AutoCloseable { + + private static final Logger log = LoggerFactory.getLogger(DorisLoadClient.class); + /** Absolute maximum for a single backoff interval: 5 minutes. */ + private static final long ABSOLUTE_MAX_INTERVAL_MS = 300_000L; + + private final DorisConfig config; + private final StreamLoader streamLoader; + + public DorisLoadClient(DorisConfig config) { + this.config = config; + this.streamLoader = new StreamLoader(); + } + + /** Package-private constructor for testing with a mock StreamLoader. */ + DorisLoadClient(DorisConfig config, StreamLoader streamLoader) { + this.config = config; + this.streamLoader = streamLoader; + } + + /** + * Loads data from the given InputStream into Doris via stream load. + * The InputStream is fully consumed and buffered before the first attempt. + * Retries with exponential backoff on retryable errors (network/HTTP failures). + * Business failures (bad data, schema mismatch, auth) are returned immediately without retry. + * + * @param inputStream data to load (consumed once; must not be shared across threads) + * @return LoadResponse with status SUCCESS or FAILURE + * @throws IOException if the stream cannot be read or all retries are exhausted + */ + public LoadResponse load(InputStream inputStream) throws IOException { + int maxRetries = 6; + long baseIntervalMs = 1000L; + long maxTotalTimeMs = 60000L; + + if (config.getRetry() != null) { + maxRetries = config.getRetry().getMaxRetryTimes(); + baseIntervalMs = config.getRetry().getBaseIntervalMs(); + maxTotalTimeMs = config.getRetry().getMaxTotalTimeMs(); + } + + log.info("Starting stream load: {}.{}", config.getDatabase(), config.getTable()); + + // Buffer the InputStream once so retries can replay the body + byte[] bodyData = readAll(inputStream); + + // Compress once before the retry loop (avoids re-compressing on each retry) + if (config.isEnableGzip()) { + bodyData = gzipCompress(bodyData); + } + Exception lastException = null; + LoadResponse lastResponse = null; + long operationStart = System.currentTimeMillis(); + + for (int attempt = 0; attempt <= maxRetries; attempt++) { + if (attempt > 0) { + log.info("Retry attempt {}/{}", attempt, maxRetries); + // Use actual wall-clock elapsed time (includes request time, not just sleep time) + long elapsed = System.currentTimeMillis() - operationStart; + long backoff = calculateBackoffMs(attempt, baseIntervalMs, maxTotalTimeMs, elapsed); + + if (maxTotalTimeMs > 0 && elapsed + backoff > maxTotalTimeMs) { + log.warn("Next retry backoff ({}ms) would exceed total limit ({}ms). Stopping.", backoff, maxTotalTimeMs); + break; + } + + log.info("Waiting {}ms before retry (elapsed so far: {}ms)", backoff, elapsed); + sleep(backoff); + } else { + log.info("Initial load attempt"); + } + + try { + HttpPut request = RequestBuilder.build(config, bodyData, attempt); + lastResponse = streamLoader.execute(request); + + if (lastResponse.getStatus() == LoadResponse.Status.SUCCESS) { + log.info("Stream load succeeded on attempt {}", attempt + 1); + return lastResponse; + } + + // Business failure (bad data, schema mismatch, auth) — do not retry + log.error("Load failed (non-retryable): {}", lastResponse.getErrorMessage()); + return lastResponse; + + } catch (StreamLoadException e) { + // Retryable: network error, HTTP 5xx, etc. + lastException = e; + log.error("Attempt {} failed with retryable error: ", attempt + 1, e); + + // Check elapsed wall-clock time (same guard as the Exception branch below) + long elapsed = System.currentTimeMillis() - operationStart; + if (maxTotalTimeMs > 0 && elapsed > maxTotalTimeMs) { + log.warn("Total elapsed time ({}ms) exceeded limit ({}ms), stopping retries.", elapsed, maxTotalTimeMs); + break; + } + } catch (Exception e) { + // Wrap unexpected exceptions as retryable + lastException = new StreamLoadException("Unexpected error building request: " + e.getMessage(), e); + log.error("Attempt {} failed with unexpected error: ", attempt + 1, e); + + // Check elapsed wall-clock time + long elapsed = System.currentTimeMillis() - operationStart; + if (maxTotalTimeMs > 0 && elapsed > maxTotalTimeMs) { + log.warn("Total elapsed time ({}ms) exceeded limit ({}ms), stopping retries.", elapsed, maxTotalTimeMs); + break; + } + } + } + + log.debug("Total operation time: {}ms", System.currentTimeMillis() - operationStart); + + if (lastException != null) { + throw new IOException("Stream load failed after " + (maxRetries + 1) + " attempts", lastException); + } + + if (lastResponse != null) { + return lastResponse; + } + + throw new IOException("Stream load failed: unknown error"); + } + + /** + * Calculates exponential backoff interval in milliseconds. + * Package-private for unit testing. + * + * Formula: base * 2^(attempt-1), constrained by remaining total time and absolute max. + * + * @param elapsedMs actual wall-clock time elapsed since the operation started (includes request time) + */ + static long calculateBackoffMs(int attempt, long baseIntervalMs, long maxTotalTimeMs, long elapsedMs) { + if (attempt <= 0) return 0; + long intervalMs = baseIntervalMs * (1L << (attempt - 1)); // base * 2^(attempt-1) + + if (maxTotalTimeMs > 0) { + long remaining = maxTotalTimeMs - elapsedMs - 5000; // reserve 5s for the next request + if (remaining <= 0) { + intervalMs = 0; + } else if (intervalMs > remaining) { + intervalMs = remaining; + } + } + + if (intervalMs > ABSOLUTE_MAX_INTERVAL_MS) intervalMs = ABSOLUTE_MAX_INTERVAL_MS; + if (intervalMs < 0) intervalMs = 0; + return intervalMs; Review Comment: `calculateBackoffMs` uses bit shifting (`1L << (attempt - 1)`) which overflows for large `attempt` values (>= 64), resulting in negative/zero intervals and breaking backoff behavior. Consider capping the exponent (e.g., `Math.min(attempt - 1, 30)`), or using a safe multiply loop/`Math.multiplyExact` with overflow handling. ########## sdk/java-doris-sdk/src/main/java/org/apache/doris/sdk/load/internal/StreamLoader.java: ########## @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.sdk.load.internal; + +import org.apache.doris.sdk.load.exception.StreamLoadException; +import org.apache.doris.sdk.load.model.LoadResponse; +import org.apache.doris.sdk.load.model.RespContent; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +/** + * Uses RedirectStrategy so that 307 redirects on PUT requests are + * followed automatically (Doris FE redirects stream load to BE). + */ +public class StreamLoader implements AutoCloseable { + + private static final Logger log = LoggerFactory.getLogger(StreamLoader.class); + private static final int SOCKET_TIMEOUT_MS = 9 * 60 * 1000; + private static final int CONNECT_TIMEOUT_MS = 60_000; + + private final HttpClientBuilder httpClientBuilder; + private final ObjectMapper objectMapper; + + public StreamLoader() { + this.httpClientBuilder = buildHttpClient(); + this.objectMapper = new ObjectMapper(); + } + + /** Package-private constructor for testing with a mock HTTP client. */ + StreamLoader(HttpClientBuilder httpClientBuilder) { + this.httpClientBuilder = httpClientBuilder; + this.objectMapper = new ObjectMapper(); + } + + /** + * Executes the HTTP PUT request and returns a LoadResponse. + * + * @throws StreamLoadException for retryable HTTP-level errors (non-200 status, connection failure) + * @throws IOException for unrecoverable I/O errors + */ + public LoadResponse execute(HttpPut request) throws IOException { + log.debug("Sending HTTP PUT to {}", request.getURI()); + long start = System.currentTimeMillis(); + + try (CloseableHttpClient httpClient = httpClientBuilder.build(); + CloseableHttpResponse response = httpClient.execute(request)) { + int statusCode = response.getStatusLine().getStatusCode(); + log.debug("HTTP response status: {}", statusCode); + log.debug("HTTP request completed in {} ms", System.currentTimeMillis() - start); + + if (statusCode == 200) { + return parseResponse(response); + } else { + // Non-200 is retryable (e.g. 503, 429) + throw new StreamLoadException("stream load error: " + response.getStatusLine().toString()); + } + } catch (StreamLoadException e) { + throw e; + } catch (IOException e) { + throw new StreamLoadException("stream load request failed: " + e.getMessage(), e); + } + } + + private LoadResponse parseResponse(CloseableHttpResponse response) throws IOException { + byte[] bodyBytes = EntityUtils.toByteArray(response.getEntity()); + String body = new String(bodyBytes, StandardCharsets.UTF_8); + log.info("Stream Load Response: {}", body); + + RespContent resp = objectMapper.readValue(body, RespContent.class); + + if (isSuccess(resp.getStatus())) { + log.info("Load operation completed successfully"); + return LoadResponse.success(resp); + } else { + log.error("Load operation failed with status: {}", resp.getStatus()); + String errorMsg; + if (resp.getMessage() != null && !resp.getMessage().isEmpty()) { + errorMsg = "load failed. cause by: " + resp.getMessage() + + ", please check more detail from url: " + resp.getErrorUrl(); + } else { + errorMsg = body; + } + return LoadResponse.failure(resp, errorMsg); + } + } + + private static boolean isSuccess(String status) { + return "success".equalsIgnoreCase(status); + } + + private static HttpClientBuilder buildHttpClient() { + try { + RequestConfig requestConfig = RequestConfig.custom() + .setConnectTimeout(CONNECT_TIMEOUT_MS) + .setSocketTimeout(SOCKET_TIMEOUT_MS) + .setConnectionRequestTimeout(CONNECT_TIMEOUT_MS) + .build(); + + return HttpClientBuilder.create() + .setDefaultRequestConfig(requestConfig) + .setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }) + .setSSLSocketFactory( + new SSLConnectionSocketFactory( + SSLContextBuilder.create() + .loadTrustMaterial(null, (chain, authType) -> true) + .build(), + NoopHostnameVerifier.INSTANCE)); Review Comment: `buildHttpClient()` configures an SSL context that trusts all certificates and disables hostname verification. This effectively removes TLS security (MITM is possible) for any https endpoint. Prefer using the JVM default trust store/hostname verifier, or make an explicit `insecureSkipVerify`-style option that is off by default and clearly documented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
