This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f0eb48e9cc1 [Enhancement](sdk) Add Go SDK as a subproject under
sdk/go-doris-sdk (#58404)
f0eb48e9cc1 is described below
commit f0eb48e9cc13fbeb9dbebf03a5732a69d314c30c
Author: bingquanzhao <[email protected]>
AuthorDate: Thu Nov 27 15:09:37 2025 +0800
[Enhancement](sdk) Add Go SDK as a subproject under sdk/go-doris-sdk
(#58404)
This PR adds an official Go SDK for Apache Doris Stream Load operations
under `sdk/go-doris-sdk/`. The SDK provides a lightweight, production-ready Go
client with features including intelligent retry, multiple format support
(JSON/CSV), group commit modes, and full concurrency safety.
---
.licenserc.yaml | 1 +
sdk/go-doris-sdk/README.md | 377 +++++++++++++++++++++
sdk/go-doris-sdk/cmd/demo/main.go | 109 ++++++
sdk/go-doris-sdk/cmd/examples/main.go | 114 +++++++
sdk/go-doris-sdk/doris.go | 95 ++++++
.../examples/concurrent_load_example.go | 123 +++++++
sdk/go-doris-sdk/examples/data_generator.go | 222 ++++++++++++
sdk/go-doris-sdk/examples/format_usage_example.go | 127 +++++++
sdk/go-doris-sdk/examples/label_removal_demo.go | 149 ++++++++
.../examples/production_concurrent_example.go | 214 ++++++++++++
.../examples/production_json_example.go | 104 ++++++
.../examples/production_single_batch_example.go | 101 ++++++
sdk/go-doris-sdk/examples/simple_config_example.go | 69 ++++
sdk/go-doris-sdk/go.mod | 30 ++
sdk/go-doris-sdk/go.sum | 17 +
.../pkg/load/client/doris_load_client.go | 327 ++++++++++++++++++
sdk/go-doris-sdk/pkg/load/config/load_config.go | 157 +++++++++
.../pkg/load/exception/stream_load_error.go | 36 ++
sdk/go-doris-sdk/pkg/load/load.go | 222 ++++++++++++
.../pkg/load/loader/request_builder.go | 184 ++++++++++
sdk/go-doris-sdk/pkg/load/loader/resp_content.go | 79 +++++
sdk/go-doris-sdk/pkg/load/loader/stream_loader.go | 125 +++++++
sdk/go-doris-sdk/pkg/load/util/http_client.go | 58 ++++
sdk/go-doris-sdk/pkg/load/util/http_client_test.go | 199 +++++++++++
24 files changed, 3239 insertions(+)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 9302360af34..fa146036fdc 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -28,6 +28,7 @@ header:
- "**/apache-orc/**"
- "**/glibc-compatibility/**"
- "**/gutil/**"
+ - "**/go.sum"
- "**/test_data/**"
- "**/jmockit/**"
- "**/*.json"
diff --git a/sdk/go-doris-sdk/README.md b/sdk/go-doris-sdk/README.md
new file mode 100644
index 00000000000..a69904fed60
--- /dev/null
+++ b/sdk/go-doris-sdk/README.md
@@ -0,0 +1,377 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# ๐ Doris Go SDK
+
+[](https://golang.org/)
+[](#-thread-safety)
+
+A lightweight Apache Doris import client (Go version) with easy-to-use,
high-performance, and production-ready features, continuously maintained by the
Apache Doris core contributor team.
+
+## โจ Key Features
+
+**Easy to Use**: Provides a simple user experience with internal encapsulation
of complex logic such as HTTP parameter configuration, multiple data format
support, and intelligent retry mechanisms.
+
+**High Performance**: Supports extremely high write throughput with built-in
optimizations including efficient concurrency handling, and batch import
practices.
+
+**Production Ready**: Validated in large-scale, high-pressure production
environments with excellent full-chain observability.
+
+## ๐ฆ Quick Installation
+
+```bash
+go get github.com/apache/doris/sdk/go-doris-sdk
+```
+
+## ๐ Quick Start
+
+### Basic CSV Loading
+
+```go
+package main
+
+import (
+ "fmt"
+ "github.com/apache/doris/sdk/go-doris-sdk"
+)
+
+func main() {
+ // ๐ฏ New API: Direct configuration construction
+ config := &doris.Config{
+ Endpoints: []string{"http://127.0.0.1:8030"},
+ User: "root",
+ Password: "password",
+ Database: "test_db",
+ Table: "users",
+ Format: doris.DefaultCSVFormat(),
+ Retry: doris.DefaultRetry(),
+ GroupCommit: doris.ASYNC,
+ }
+
+ // Create client
+ client, err := doris.NewLoadClient(config)
+ if err != nil {
+ panic(err)
+ }
+
+ // Load data
+ data := "1,Alice,25\n2,Bob,30\n3,Charlie,35"
+ response, err := client.Load(doris.StringReader(data))
+
+ if err != nil {
+ fmt.Printf("โ Load failed: %v\n", err)
+ return
+ }
+
+ if response.Status == doris.SUCCESS {
+ fmt.Printf("โ
Successfully loaded %d rows!\n",
response.Resp.NumberLoadedRows)
+ }
+}
+```
+[
+### JSON Data Loading
+
+```go
+config := &doris.Config{
+ Endpoints: []string{"http://127.0.0.1:8030"},
+ User: "root",
+ Password: "password",
+ Database: "test_db",
+ Table: "users",
+ Format: doris.DefaultJSONFormat(), // JSON Lines format
+ Retry: doris.DefaultRetry(),
+ GroupCommit: doris.ASYNC,
+}
+
+client, _ := doris.NewLoadClient(config)
+
+// JSON Lines data
+jsonData := `{"id":1,"name":"Alice","age":25}
+{"id":2,"name":"Bob","age":30}
+{"id":3,"name":"Charlie","age":35}`
+
+response, err := client.Load(doris.StringReader(jsonData))
+```
+
+## ๐ ๏ธ Configuration Guide
+
+### Basic Configuration
+
+```go
+config := &doris.Config{
+ // Required fields
+ Endpoints: []string{
+ "http://fe1:8630",
+ "http://fe2:8630", // Multiple FE nodes supported, auto load
balancing
+ },
+ User: "your_username",
+ Password: "your_password",
+ Database: "your_database",
+ Table: "your_table",
+
+ // Optional fields
+ LabelPrefix: "my_app", // Label prefix
+ Label: "custom_label_001", // Custom label
+ Format: doris.DefaultCSVFormat(),
+ Retry: doris.DefaultRetry(),
+ GroupCommit: doris.ASYNC,
+ Options: map[string]string{
+ "timeout": "3600",
+ "max_filter_ratio": "0.1",
+ "strict_mode": "true",
+ },
+}
+```
+
+### Data Format Configuration
+
+```go
+// 1. Use default formats (recommended)
+Format: doris.DefaultJSONFormat() // JSON Lines, read_json_by_line=true
+Format: doris.DefaultCSVFormat() // CSV, comma separated, newline delimiter
+
+// 2. Custom JSON format
+Format: &doris.JSONFormat{Type: doris.JSONObjectLine} // JSON Lines
+Format: &doris.JSONFormat{Type: doris.JSONArray} // JSON Array
+
+// 3. Custom CSV format
+Format: &doris.CSVFormat{
+ ColumnSeparator: "|", // Pipe separator
+ LineDelimiter: "\n", // Newline delimiter
+}
+```
+
+### Retry Strategy Configuration
+
+```go
+// 1. Use default retry (recommended)
+Retry: doris.DefaultRetry() // 6 retries, 60 seconds total
+// Retry intervals: [1s, 2s, 4s, 8s, 16s, 32s]
+
+// 2. Custom retry
+Retry: &doris.Retry{
+ MaxRetryTimes: 3, // Maximum retry times
+ BaseIntervalMs: 2000, // Base interval 2 seconds
+ MaxTotalTimeMs: 30000, // Total time limit 30 seconds
+}
+
+// 3. Disable retry
+Retry: nil
+```
+
+### Group Commit Mode
+
+```go
+GroupCommit: doris.ASYNC, // Async mode, highest throughput
+GroupCommit: doris.SYNC, // Sync mode, immediately visible
+GroupCommit: doris.OFF, // Off, use traditional mode
+```
+
+> โ ๏ธ **Note**: When Group Commit is enabled, all Label configurations are
automatically ignored and warning logs are recorded.
+
+## ๐ Concurrent Usage
+
+### Basic Concurrency Example
+
+```go
+func worker(id int, client *doris.DorisLoadClient, wg *sync.WaitGroup) {
+ defer wg.Done()
+
+ // โ
Each worker uses independent data
+ data := fmt.Sprintf("%d,Worker_%d,Data", id, id)
+
+ response, err := client.Load(doris.StringReader(data))
+ if err != nil {
+ fmt.Printf("Worker %d failed: %v\n", id, err)
+ return
+ }
+
+ if response.Status == doris.SUCCESS {
+ fmt.Printf("โ
Worker %d successfully loaded %d rows\n", id,
response.Resp.NumberLoadedRows)
+ }
+}
+
+func main() {
+ client, _ := doris.NewLoadClient(config)
+
+ var wg sync.WaitGroup
+ // ๐ Launch 10 concurrent workers
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go worker(i, client, &wg)
+ }
+ wg.Wait()
+}
+```
+
+### โ ๏ธ Thread Safety Notes
+
+- โ
**DorisLoadClient is thread-safe** - Can be shared across multiple
goroutines
+- โ **Readers should not be shared** - Each goroutine should use independent
data sources
+
+```go
+// โ
Correct concurrent pattern
+for i := 0; i < numWorkers; i++ {
+ go func(workerID int) {
+ data := generateWorkerData(workerID) // Independent data
+ response, err := client.Load(doris.StringReader(data))
+ }(i)
+}
+
+// โ Wrong concurrent pattern - Don't do this!
+file, _ := os.Open("data.csv")
+for i := 0; i < 10; i++ {
+ go func() {
+ client.Load(file) // โ Multiple goroutines sharing same Reader
+ }()
+}
+```
+
+## ๐ Response Handling
+
+```go
+response, err := client.Load(data)
+
+// 1. Check system-level errors
+if err != nil {
+ fmt.Printf("System error: %v\n", err)
+ return
+}
+
+// 2. Check load status
+switch response.Status {
+case doris.SUCCESS:
+ fmt.Printf("โ
Load successful!\n")
+ fmt.Printf("๐ Statistics:\n")
+ fmt.Printf(" - Loaded rows: %d\n", response.Resp.NumberLoadedRows)
+ fmt.Printf(" - Loaded bytes: %d\n", response.Resp.LoadBytes)
+ fmt.Printf(" - Time: %d ms\n", response.Resp.LoadTimeMs)
+ fmt.Printf(" - Label: %s\n", response.Resp.Label)
+
+case doris.FAILURE:
+ fmt.Printf("โ Load failed: %s\n", response.ErrorMessage)
+
+ // Get detailed error information
+ if response.Resp.ErrorURL != "" {
+ fmt.Printf("๐ Error details: %s\n", response.Resp.ErrorURL)
+ }
+}
+```
+
+## ๐ Log Control
+
+### Basic Log Configuration
+
+```go
+// Set log level
+doris.SetLogLevel(doris.LogLevelInfo) // Recommended for production
+doris.SetLogLevel(doris.LogLevelDebug) // For development debugging
+doris.SetLogLevel(doris.LogLevelError) // Only show errors
+
+// Disable all logs
+doris.DisableLogging()
+
+// Output to file
+file, _ := os.OpenFile("doris.log", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
+doris.SetLogOutput(file)
+```
+
+### Concurrent Scenario Logging
+
+```go
+// Create context logger for each worker
+logger := doris.NewContextLogger("Worker-1")
+logger.Infof("Starting batch %d", batchID)
+logger.Warnf("Retry detected, attempt: %d", retryCount)
+```
+
+### Integrate Third-party Logging Libraries
+
+```go
+import "github.com/sirupsen/logrus"
+
+logger := logrus.New()
+logger.SetLevel(logrus.InfoLevel)
+
+// Integrate with Doris SDK
+doris.SetCustomLogFuncs(
+ logger.Debugf, // Debug level
+ logger.Infof, // Info level
+ logger.Warnf, // Warn level
+ logger.Errorf, // Error level
+)
+```
+
+## ๐ Production Examples
+
+We provide complete production-level examples:
+
+```bash
+# Run all examples
+go run cmd/examples/main.go all
+
+# Individual examples
+go run cmd/examples/main.go single # Large batch load (100k records)
+go run cmd/examples/main.go concurrent # Concurrent load (1M records, 10
workers)
+go run cmd/examples/main.go json # JSON load (50k records)
+go run cmd/examples/main.go basic # Basic concurrency (5 workers)
+```
+
+## ๐ ๏ธ Utility Tools
+
+### Data Conversion Helpers
+
+```go
+// String to Reader
+reader := doris.StringReader("1,Alice,25\n2,Bob,30")
+
+// Byte array to Reader
+data := []byte("1,Alice,25\n2,Bob,30")
+reader := doris.BytesReader(data)
+
+// Struct to JSON Reader
+users := []User{{ID: 1, Name: "Alice"}}
+reader, err := doris.JSONReader(users)
+```
+
+### Default Configuration Builders
+
+```go
+// Quick create common configurations
+retry := doris.DefaultRetry() // 6 retries, 60 seconds total
+jsonFormat := doris.DefaultJSONFormat() // JSON Lines format
+csvFormat := doris.DefaultCSVFormat() // Standard CSV format
+
+// Custom configuration
+customRetry := doris.NewRetry(3, 1000) // 3 retries, 1 second base interval
+```
+
+## ๐ Documentation and Examples
+
+- ๐ [API Migration Guide](docs/API_MIGRATION_GUIDE.md) - Guide for upgrading
from old API
+- ๐งต [Thread Safety Analysis](docs/THREAD_SAFETY_ANALYSIS.md) - Detailed
concurrency safety documentation
+- ๐ [Reader Concurrency Analysis](docs/READER_CONCURRENCY_ANALYSIS.md) -
Reader usage best practices
+- ๐ [Example Details](examples/README.md) - Detailed explanation of all
examples
+
+## ๐ค Contributing
+
+Contributions are welcome! Please feel free to submit a Pull Request.
+
+## ๐ Acknowledgments
+
+Maintained by the Apache Doris core contributor team.
diff --git a/sdk/go-doris-sdk/cmd/demo/main.go
b/sdk/go-doris-sdk/cmd/demo/main.go
new file mode 100644
index 00000000000..e5d75987a60
--- /dev/null
+++ b/sdk/go-doris-sdk/cmd/demo/main.go
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "fmt"
+
+ "github.com/apache/doris/sdk/go-doris-sdk"
+)
+
+func main() {
+ fmt.Println("Doris SDK - Format Interface Demonstration")
+
+ // ========== Demo 1: Using JSONFormat ==========
+ fmt.Println("\n=== Demo 1: JSON Format Configuration ===")
+
+ jsonConfig := &doris.Config{
+ Endpoints: []string{"http://10.16.10.6:8630"},
+ User: "root",
+ Password: "password",
+ Database: "test_db",
+ Table: "test_table",
+ LabelPrefix: "json_demo",
+ // User directly constructs JSONFormat
+ Format: &doris.JSONFormat{Type: doris.JSONObjectLine},
+ Retry: doris.DefaultRetry(),
+ GroupCommit: doris.ASYNC,
+ Options: map[string]string{
+ "strict_mode": "true",
+ },
+ }
+
+ fmt.Printf("JSON Config Format Type: %s\n",
jsonConfig.Format.GetFormatType())
+ fmt.Printf("JSON Format Options: %+v\n", jsonConfig.Format.GetOptions())
+
+ // ========== Demo 2: Using CSVFormat ==========
+ fmt.Println("\n=== Demo 2: CSV Format Configuration ===")
+
+ csvConfig := &doris.Config{
+ Endpoints: []string{"http://10.16.10.6:8630"},
+ User: "root",
+ Password: "password",
+ Database: "test_db",
+ Table: "test_table",
+ LabelPrefix: "csv_demo",
+ // User directly constructs CSVFormat
+ Format: &doris.CSVFormat{
+ ColumnSeparator: ",",
+ LineDelimiter: "\n",
+ },
+ Retry: doris.DefaultRetry(),
+ GroupCommit: doris.SYNC,
+ Options: map[string]string{
+ "max_filter_ratio": "0.1",
+ },
+ }
+
+ fmt.Printf("CSV Config Format Type: %s\n",
csvConfig.Format.GetFormatType())
+ fmt.Printf("CSV Format Options: %+v\n", csvConfig.Format.GetOptions())
+
+ // ========== Demo 3: Other JSON Formats ==========
+ fmt.Println("\n=== Demo 3: JSON Array Format ===")
+
+ jsonArrayConfig := &doris.Config{
+ Endpoints: []string{"http://10.16.10.6:8630"},
+ User: "root",
+ Password: "password",
+ Database: "test_db",
+ Table: "test_table",
+ // Directly construct JSONFormat - Array type
+ Format: &doris.JSONFormat{Type: doris.JSONArray},
+ Retry: &doris.Retry{MaxRetryTimes: 3, BaseIntervalMs:
2000},
+ GroupCommit: doris.OFF,
+ }
+
+ fmt.Printf("JSON Array Format Type: %s\n",
jsonArrayConfig.Format.GetFormatType())
+ fmt.Printf("JSON Array Format Options: %+v\n",
jsonArrayConfig.Format.GetOptions())
+
+ // ========== Configuration Validation ==========
+ fmt.Println("\n=== Configuration Validation ===")
+
+ configs := []*doris.Config{jsonConfig, csvConfig, jsonArrayConfig}
+ configNames := []string{"JSON ObjectLine Config", "CSV Config", "JSON
Array Config"}
+
+ for i, config := range configs {
+ if err := config.ValidateInternal(); err != nil {
+ fmt.Printf("%s validation failed: %v\n",
configNames[i], err)
+ } else {
+ fmt.Printf("%s validation passed!\n", configNames[i])
+ }
+ }
+
+ fmt.Println("\nDemonstration complete!")
+}
diff --git a/sdk/go-doris-sdk/cmd/examples/main.go
b/sdk/go-doris-sdk/cmd/examples/main.go
new file mode 100644
index 00000000000..afb87dfa3ec
--- /dev/null
+++ b/sdk/go-doris-sdk/cmd/examples/main.go
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package main provides a unified entry point for running all Doris Stream
Load examples
+// Usage: go run cmd/examples/main.go [example_name]
+// Available examples: single, concurrent, json, basic
+package main
+
+import (
+ "fmt"
+ "os"
+ "strings"
+
+ "github.com/apache/doris/sdk/go-doris-sdk/examples"
+)
+
+const usage = `
+Doris Stream Load Client - Production Examples Runner
+
+Usage: go run cmd/examples/main.go [example_name]
+
+Available Examples:
+ single - Production single batch loading (100,000 records)
+ concurrent - Production concurrent loading (1,000,000 records across 10
workers)
+ json - Production JSON data loading (50,000 JSON records)
+ basic - Basic concurrent loading demo (5 workers)
+ all - Run all examples sequentially
+
+Examples:
+ go run cmd/examples/main.go single
+ go run cmd/examples/main.go concurrent
+ go run cmd/examples/main.go json
+ go run cmd/examples/main.go basic
+ go run cmd/examples/main.go all
+
+Description:
+ single - Demonstrates single-threaded large batch loading with
realistic product data
+ concurrent - Shows high-throughput concurrent loading with 10 workers
processing order data
+ json - Illustrates JSON Lines format loading with structured user
activity data
+ basic - Simple concurrent example for learning and development
+ all - Runs all examples in sequence for comprehensive testing
+
+For more details, see examples/README.md
+`
+
+func printUsage() {
+ fmt.Print(usage)
+}
+
+func runExample(name string) {
+ switch strings.ToLower(name) {
+ case "single":
+ fmt.Println("Running Production Single Batch Example...")
+ examples.RunSingleBatchExample()
+ case "concurrent":
+ fmt.Println("Running Production Concurrent Example...")
+ examples.RunConcurrentExample()
+ case "json":
+ fmt.Println("Running Production JSON Example...")
+ examples.RunJSONExample()
+ case "basic":
+ fmt.Println("Running Basic Concurrent Example...")
+ examples.RunBasicConcurrentExample()
+ case "all":
+ fmt.Println("Running All Examples...")
+ fmt.Println("\n" + strings.Repeat("=", 80))
+ examples.RunSingleBatchExample()
+ fmt.Println("\n" + strings.Repeat("=", 80))
+ examples.RunConcurrentExample()
+ fmt.Println("\n" + strings.Repeat("=", 80))
+ examples.RunJSONExample()
+ fmt.Println("\n" + strings.Repeat("=", 80))
+ examples.RunBasicConcurrentExample()
+ fmt.Println("\n" + strings.Repeat("=", 80))
+ fmt.Println("All examples completed!")
+ default:
+ fmt.Printf("โ Unknown example: %s\n\n", name)
+ printUsage()
+ os.Exit(1)
+ }
+}
+
+func main() {
+ fmt.Println("๐ Doris Stream Load Client - Examples Runner")
+ fmt.Println("=" + strings.Repeat("=", 50))
+
+ if len(os.Args) < 2 {
+ fmt.Println("โ No example specified\n")
+ printUsage()
+ os.Exit(1)
+ }
+
+ exampleName := os.Args[1]
+
+ // Show what we're about to run
+ fmt.Printf("๐ Selected example: %s\n", exampleName)
+ fmt.Println(strings.Repeat("-", 50))
+
+ runExample(exampleName)
+}
diff --git a/sdk/go-doris-sdk/doris.go b/sdk/go-doris-sdk/doris.go
new file mode 100644
index 00000000000..520e026249d
--- /dev/null
+++ b/sdk/go-doris-sdk/doris.go
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package doris provides a high-level API for loading data into Apache Doris
+// This is a backward-compatible wrapper that re-exports functionality from
pkg/load
+package doris
+
+import "github.com/apache/doris/sdk/go-doris-sdk/pkg/load"
+
+// Config aliases
+type Config = load.Config
+
+// Client aliases
+type DorisLoadClient = load.DorisLoadClient
+
+// Format aliases
+type Format = load.Format
+type JSONFormatType = load.JSONFormatType
+type JSONFormat = load.JSONFormat
+type CSVFormat = load.CSVFormat
+
+// Log aliases
+type LogLevel = load.LogLevel
+type LogFunc = load.LogFunc
+type ContextLogger = load.ContextLogger
+
+// Load response aliases
+type LoadResponse = load.LoadResponse
+type LoadStatus = load.LoadStatus
+
+// Enum constants
+const (
+ // JSON format constants
+ JSONObjectLine = load.JSONObjectLine
+ JSONArray = load.JSONArray
+
+ // Group commit constants
+ SYNC = load.SYNC
+ ASYNC = load.ASYNC
+ OFF = load.OFF
+
+ // Load status constants
+ SUCCESS = load.SUCCESS
+ FAILURE = load.FAILURE
+
+ // Log level constants
+ LogLevelDebug = load.LogLevelDebug
+ LogLevelInfo = load.LogLevelInfo
+ LogLevelWarn = load.LogLevelWarn
+ LogLevelError = load.LogLevelError
+)
+
+// GroupCommitMode aliases
+type GroupCommitMode = load.GroupCommitMode
+type Retry = load.Retry
+
+// Function aliases for easy access
+var (
+ // Client functions
+ NewLoadClient = load.NewLoadClient
+
+ // Data conversion helpers
+ StringReader = load.StringReader
+ BytesReader = load.BytesReader
+ JSONReader = load.JSONReader
+
+ // Logging functions
+ SetLogLevel = load.SetLogLevel
+ SetLogOutput = load.SetLogOutput
+ DisableLogging = load.DisableLogging
+ SetCustomLogFunc = load.SetCustomLogFunc
+ SetCustomLogFuncs = load.SetCustomLogFuncs
+ NewContextLogger = load.NewContextLogger
+
+ // Default configuration builders
+ DefaultJSONFormat = load.DefaultJSONFormat
+ DefaultCSVFormat = load.DefaultCSVFormat
+ DefaultRetry = load.DefaultRetry
+ NewRetry = load.NewRetry
+ NewDefaultRetry = load.NewDefaultRetry
+)
diff --git a/sdk/go-doris-sdk/examples/concurrent_load_example.go
b/sdk/go-doris-sdk/examples/concurrent_load_example.go
new file mode 100644
index 00000000000..42b63a638b6
--- /dev/null
+++ b/sdk/go-doris-sdk/examples/concurrent_load_example.go
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package examples demonstrates basic concurrent loading with enhanced
logging and thread safety
+// This example shows how multiple goroutines can safely share a single
DorisLoadClient
+// Key features: thread-safe client, enhanced logging with goroutine tracking,
proper error handling
+// Uses unified orders schema for consistency across all examples
+package examples
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ doris "github.com/apache/doris/sdk/go-doris-sdk"
+)
+
+// workerFunction simulates a worker that loads data concurrently
+func workerFunction(workerID int, client *doris.DorisLoadClient, wg
*sync.WaitGroup) {
+ defer wg.Done()
+
+ // Create context logger for this worker
+ workerLogger := doris.NewContextLogger(fmt.Sprintf("Worker-%d",
workerID))
+
+ // Generate unique order data for this worker using unified schema
+ data := GenerateSimpleOrderCSV(workerID)
+
+ workerLogger.Infof("Starting load operation with %d bytes of order
data", len(data))
+
+ // Perform the load operation
+ start := time.Now()
+ response, err := client.Load(doris.StringReader(data))
+ duration := time.Since(start)
+
+ // Simple response handling
+ if err != nil {
+ fmt.Printf("โ Worker-%d failed: %v\n", workerID, err)
+ return
+ }
+
+ if response != nil && response.Status == doris.SUCCESS {
+ fmt.Printf("โ
Worker-%d completed in %v\n", workerID, duration)
+ if response.Resp.Label != "" {
+ fmt.Printf("๐ Worker-%d: Label=%s, Rows=%d\n",
workerID, response.Resp.Label, response.Resp.NumberLoadedRows)
+ }
+ } else {
+ if response != nil {
+ fmt.Printf("โ Worker-%d failed with status: %v\n",
workerID, response.Status)
+ } else {
+ fmt.Printf("โ Worker-%d failed: no response\n",
workerID)
+ }
+ }
+}
+
+// RunBasicConcurrentExample demonstrates basic concurrent loading capabilities
+func RunBasicConcurrentExample() {
+ fmt.Println("=== Basic Concurrent Loading Demo ===")
+
+ // Enhanced logging configuration
+ doris.SetLogLevel(doris.LogLevelInfo)
+
+ // We can't directly call log.Infof, so create a context logger first
+ logger := doris.NewContextLogger("ConcurrentDemo")
+ logger.Infof("Starting concurrent loading demo with enhanced logging")
+
+ // Create configuration using direct struct construction
+ config := &doris.Config{
+ Endpoints: []string{"http://10.16.10.6:8630"},
+ User: "root",
+ Password: "",
+ Database: "test",
+ Table: "orders", // Unified orders table
+ LabelPrefix: "demo_concurrent",
+ Format: doris.DefaultCSVFormat(), // Use default CSV format
+ Retry: doris.DefaultRetry(), // 6 retries: [1s, 2s,
4s, 8s, 16s, 32s] = ~63s total
+ GroupCommit: doris.ASYNC,
+ }
+
+ // Create client (this is thread-safe and can be shared across
goroutines)
+ client, err := doris.NewLoadClient(config)
+ if err != nil {
+ logger.Errorf("Failed to create load client: %v", err)
+ return
+ }
+
+ logger.Infof("โ
Load client created successfully")
+
+ // Demonstrate concurrent loading with multiple workers
+ const numWorkers = 5
+ var wg sync.WaitGroup
+
+ fmt.Printf("๐ Launching %d concurrent workers...\n", numWorkers)
+
+ // Launch workers concurrently
+ overallStart := time.Now()
+ for i := 0; i < numWorkers; i++ {
+ wg.Add(1)
+ go workerFunction(i, client, &wg)
+ // Small delay to show worker launch sequence
+ time.Sleep(200 * time.Millisecond)
+ }
+
+ // Wait for all workers to complete
+ wg.Wait()
+ overallDuration := time.Since(overallStart)
+
+ fmt.Printf("๐ All %d workers completed in %v!\n", numWorkers,
overallDuration)
+ fmt.Println("=== Demo Complete ===")
+}
diff --git a/sdk/go-doris-sdk/examples/data_generator.go
b/sdk/go-doris-sdk/examples/data_generator.go
new file mode 100644
index 00000000000..0a9b9de2062
--- /dev/null
+++ b/sdk/go-doris-sdk/examples/data_generator.go
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package examples provides unified data generation utilities for all Doris
Stream Load examples
+// This file centralizes all mock data generation logic for better code reuse
and maintainability
+// All examples use unified Order schema for consistency
+package examples
+
+import (
+ "fmt"
+ "math/rand"
+ "strings"
+ "time"
+
+ doris "github.com/apache/doris/sdk/go-doris-sdk"
+)
+
+// Common data for generating realistic order test data
+var (
+ // Unified order-related data
+ Categories = []string{"Electronics", "Clothing", "Books", "Home",
"Sports", "Beauty", "Automotive", "Food", "Health", "Toys"}
+ Brands = []string{"Apple", "Samsung", "Nike", "Adidas", "Sony",
"LG", "Canon", "Dell", "HP", "Xiaomi", "Huawei", "Lenovo"}
+ Statuses = []string{"active", "inactive", "pending", "discontinued",
"completed", "cancelled"}
+ Regions = []string{"North", "South", "East", "West", "Central"}
+
+ // Additional data for variety
+ Countries = []string{"US", "CN", "JP", "DE", "UK", "FR", "CA", "AU",
"IN", "BR"}
+ Devices = []string{"mobile", "desktop", "tablet"}
+)
+
+// OrderRecord represents a unified e-commerce order record (used by all
examples)
+type OrderRecord struct {
+ OrderID int `json:"order_id"`
+ CustomerID int `json:"customer_id"`
+ ProductName string `json:"product_name"`
+ Category string `json:"category"`
+ Brand string `json:"brand"`
+ Quantity int `json:"quantity"`
+ UnitPrice float64 `json:"unit_price"`
+ TotalAmount float64 `json:"total_amount"`
+ Status string `json:"status"`
+ OrderDate string `json:"order_date"`
+ Region string `json:"region"`
+}
+
+// DataGeneratorConfig holds configuration for data generation
+type DataGeneratorConfig struct {
+ WorkerID int // For concurrent scenarios
+ BatchSize int // Number of records to generate
+ ContextName string // For logging context
+ RandomSeed int64 // For reproducible random data
+}
+
+// GenerateOrderCSV creates realistic order data in CSV format (unified for
all examples)
+func GenerateOrderCSV(config DataGeneratorConfig) string {
+ contextLogger := doris.NewContextLogger(config.ContextName)
+ contextLogger.Infof("Generating %d order records...", config.BatchSize)
+ start := time.Now()
+
+ // Pre-allocate builder for memory efficiency
+ estimatedSize := config.BatchSize * 200 // ~200 bytes per record
+ var builder strings.Builder
+ builder.Grow(estimatedSize)
+
+ // Worker-specific random seed to avoid collisions
+ seed := config.RandomSeed
+ if seed == 0 {
+ seed = time.Now().UnixNano() + int64(config.WorkerID*1000)
+ }
+ rng := rand.New(rand.NewSource(seed))
+ baseOrderID := config.WorkerID * config.BatchSize
+
+ for i := 1; i <= config.BatchSize; i++ {
+ quantity := rng.Intn(10) + 1
+ unitPrice := float64(rng.Intn(50000)) / 100.0 // $0.01 to
$500.00
+ totalAmount := float64(quantity) * unitPrice
+
+ record := OrderRecord{
+ OrderID: baseOrderID + i,
+ CustomerID: rng.Intn(100000) + 1,
+ ProductName: fmt.Sprintf("Product_%s_%d",
Brands[rng.Intn(len(Brands))], rng.Intn(1000)),
+ Category: Categories[rng.Intn(len(Categories))],
+ Brand: Brands[rng.Intn(len(Brands))],
+ Quantity: quantity,
+ UnitPrice: unitPrice,
+ TotalAmount: totalAmount,
+ Status: Statuses[rng.Intn(len(Statuses))],
+ OrderDate:
time.Now().Add(-time.Duration(rng.Intn(365*24)) * time.Hour).Format("2006-01-02
15:04:05"),
+ Region: Regions[rng.Intn(len(Regions))],
+ }
+
+ // CSV format:
order_id,customer_id,product_name,category,brand,quantity,unit_price,total_amount,status,order_date,region
+
builder.WriteString(fmt.Sprintf("%d,%d,\"%s\",%s,%s,%d,%.2f,%.2f,%s,%s,%s\n",
+ record.OrderID,
+ record.CustomerID,
+ record.ProductName,
+ record.Category,
+ record.Brand,
+ record.Quantity,
+ record.UnitPrice,
+ record.TotalAmount,
+ record.Status,
+ record.OrderDate,
+ record.Region,
+ ))
+
+ // Progress indicator for large datasets
+ if i%10000 == 0 {
+ contextLogger.Infof("Generated %d/%d records (%.1f%%)",
i, config.BatchSize, float64(i)/float64(config.BatchSize)*100)
+ }
+ }
+
+ generationTime := time.Since(start)
+ dataSize := builder.Len()
+ contextLogger.Infof("Order data generation completed: %d records, %d
bytes, took %v", config.BatchSize, dataSize, generationTime)
+ contextLogger.Infof("Generation rate: %.0f records/sec, %.1f MB/sec",
+ float64(config.BatchSize)/generationTime.Seconds(),
+ float64(dataSize)/1024/1024/generationTime.Seconds())
+
+ return builder.String()
+}
+
+// GenerateOrderJSON creates realistic order data in JSON Lines format
(unified schema)
+func GenerateOrderJSON(config DataGeneratorConfig) string {
+ contextLogger := doris.NewContextLogger(config.ContextName)
+ contextLogger.Infof("Generating %d JSON order records...",
config.BatchSize)
+ start := time.Now()
+
+ // Pre-allocate builder for memory efficiency
+ estimatedSize := config.BatchSize * 300 // JSON records are larger
+ var builder strings.Builder
+ builder.Grow(estimatedSize)
+
+ // Worker-specific random seed
+ seed := config.RandomSeed
+ if seed == 0 {
+ seed = time.Now().UnixNano() + int64(config.WorkerID*1000)
+ }
+ rng := rand.New(rand.NewSource(seed))
+ baseOrderID := config.WorkerID * config.BatchSize
+
+ for i := 1; i <= config.BatchSize; i++ {
+ quantity := rng.Intn(10) + 1
+ unitPrice := float64(rng.Intn(50000)) / 100.0 // $0.01 to
$500.00
+ totalAmount := float64(quantity) * unitPrice
+
+ record := OrderRecord{
+ OrderID: baseOrderID + i,
+ CustomerID: rng.Intn(100000) + 1,
+ ProductName: fmt.Sprintf("Product_%s_%d",
Brands[rng.Intn(len(Brands))], rng.Intn(1000)),
+ Category: Categories[rng.Intn(len(Categories))],
+ Brand: Brands[rng.Intn(len(Brands))],
+ Quantity: quantity,
+ UnitPrice: unitPrice,
+ TotalAmount: totalAmount,
+ Status: Statuses[rng.Intn(len(Statuses))],
+ OrderDate:
time.Now().Add(-time.Duration(rng.Intn(365*24)) *
time.Hour).Format("2006-01-02T15:04:05Z"),
+ Region: Regions[rng.Intn(len(Regions))],
+ }
+
+ // Manual JSON construction for better control
+ jsonRecord :=
fmt.Sprintf(`{"OrderID":%d,"CustomerID":%d,"ProductName":"%s","Category":"%s","Brand":"%s","Quantity":%d,"UnitPrice":%.2f,"TotalAmount":%.2f,"Status":"%s","OrderDate":"%s","Region":"%s"}`,
+ record.OrderID,
+ record.CustomerID,
+ record.ProductName,
+ record.Category,
+ record.Brand,
+ record.Quantity,
+ record.UnitPrice,
+ record.TotalAmount,
+ record.Status,
+ record.OrderDate,
+ record.Region,
+ )
+
+ builder.WriteString(jsonRecord)
+ builder.WriteString("\n") // JSON lines format
+
+ // Progress indicator for large datasets
+ if i%10000 == 0 {
+ contextLogger.Infof("Generated %d/%d JSON records
(%.1f%%)", i, config.BatchSize, float64(i)/float64(config.BatchSize)*100)
+ }
+ }
+
+ generationTime := time.Since(start)
+ dataSize := builder.Len()
+ contextLogger.Infof("JSON order data generation completed: %d records,
%d bytes, took %v", config.BatchSize, dataSize, generationTime)
+ contextLogger.Infof("Generation rate: %.0f records/sec, %.1f MB/sec",
+ float64(config.BatchSize)/generationTime.Seconds(),
+ float64(dataSize)/1024/1024/generationTime.Seconds())
+
+ return builder.String()
+}
+
+// GenerateSimpleOrderCSV creates simple order data for basic examples
+func GenerateSimpleOrderCSV(workerID int) string {
+ quantity := 1
+ unitPrice := 10.0 + float64(workerID)
+ totalAmount := float64(quantity) * unitPrice
+
+ return
fmt.Sprintf("order_id,customer_id,product_name,category,brand,quantity,unit_price,total_amount,status,order_date,region\n%d,%d,\"Product_%d\",Electronics,TestBrand,%d,%.2f,%.2f,active,2024-01-01
12:00:00,Central\n",
+ workerID,
+ workerID+1000,
+ workerID,
+ quantity,
+ unitPrice,
+ totalAmount)
+}
diff --git a/sdk/go-doris-sdk/examples/format_usage_example.go
b/sdk/go-doris-sdk/examples/format_usage_example.go
new file mode 100644
index 00000000000..50eccc81888
--- /dev/null
+++ b/sdk/go-doris-sdk/examples/format_usage_example.go
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package examples
+
+import (
+ "fmt"
+
+ "github.com/apache/doris/sdk/go-doris-sdk"
+)
+
+// FormatUsageExample demonstrates how to use Format interface
+func FormatUsageExample() {
+ fmt.Println("=== Format Interface Usage Example ===")
+
+ // Method 1: Direct JSONFormat construction (recommended)
+ jsonConfig := &doris.Config{
+ Endpoints: []string{"http://localhost:8630"},
+ User: "root",
+ Password: "password",
+ Database: "example_db",
+ Table: "example_table",
+ // Direct JSONFormat struct construction
+ Format: &doris.JSONFormat{
+ Type: doris.JSONObjectLine, // or doris.JSONArray
+ },
+ Retry: &doris.Retry{
+ MaxRetryTimes: 3,
+ BaseIntervalMs: 1000,
+ MaxTotalTimeMs: 60000, // Add total time limit
+ },
+ GroupCommit: doris.ASYNC,
+ }
+
+ // Method 2: Direct CSVFormat construction (recommended)
+ csvConfig := &doris.Config{
+ Endpoints: []string{"http://localhost:8630"},
+ User: "root",
+ Password: "password",
+ Database: "example_db",
+ Table: "example_table",
+ // Direct CSVFormat struct construction
+ Format: &doris.CSVFormat{
+ ColumnSeparator: ",",
+ LineDelimiter: "\n",
+ },
+ Retry: &doris.Retry{
+ MaxRetryTimes: 5,
+ BaseIntervalMs: 2000,
+ MaxTotalTimeMs: 60000, // Add total time limit
+ },
+ GroupCommit: doris.SYNC,
+ }
+
+ // Method 3: Custom format configuration
+ customConfig := &doris.Config{
+ Endpoints: []string{"http://localhost:8630"},
+ User: "root",
+ Password: "password",
+ Database: "example_db",
+ Table: "example_table",
+ // Custom CSV separator
+ Format: &doris.CSVFormat{
+ ColumnSeparator: "|", // Pipe separator
+ LineDelimiter: "\\n", // Custom line delimiter
+ },
+ Retry: doris.DefaultRetry(),
+ GroupCommit: doris.OFF,
+ }
+
+ // Demonstrate Format interface usage
+ configs := []*doris.Config{jsonConfig, csvConfig, customConfig}
+ configNames := []string{"JSON Config", "CSV Config", "Custom CSV
Config"}
+
+ for i, config := range configs {
+ fmt.Printf("\n--- %s ---\n", configNames[i])
+ fmt.Printf("Format Type: %s\n", config.Format.GetFormatType())
+ fmt.Printf("Format Options: %v\n", config.Format.GetOptions())
+
+ // Validate configuration
+ if err := config.ValidateInternal(); err != nil {
+ fmt.Printf("Validation Error: %v\n", err)
+ continue
+ }
+
+ // Create client
+ client, err := doris.NewLoadClient(config)
+ if err != nil {
+ fmt.Printf("Client Creation Error: %v\n", err)
+ continue
+ }
+
+ fmt.Printf("Client created successfully for %s\n",
config.Format.GetFormatType())
+
+ // Simulate data loading
+ var sampleData string
+ if config.Format.GetFormatType() == "json" {
+ sampleData = `{"id": 1, "name": "Alice"}
+{"id": 2, "name": "Bob"}`
+ } else {
+ sampleData = `1,Alice
+2,Bob`
+ }
+
+ fmt.Printf("Sample data for %s format:\n%s\n",
config.Format.GetFormatType(), sampleData)
+
+ // Note: This is just a demonstration, actual use requires a
real Doris server
+ _ = client
+ _ = sampleData
+ }
+
+ fmt.Println("\n=== Example Complete ===")
+}
diff --git a/sdk/go-doris-sdk/examples/label_removal_demo.go
b/sdk/go-doris-sdk/examples/label_removal_demo.go
new file mode 100644
index 00000000000..7336af89170
--- /dev/null
+++ b/sdk/go-doris-sdk/examples/label_removal_demo.go
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package examples
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/apache/doris/sdk/go-doris-sdk"
+)
+
+// LabelRemovalDemo demonstrates the logging when labels are removed due to
group commit
+func LabelRemovalDemo() {
+ fmt.Println("=== Label Removal Logging Demo ===")
+
+ // Set log level to see warning messages
+ doris.SetLogLevel(doris.LogLevelInfo)
+
+ // Demo 1: Custom Label + Group Commit
+ fmt.Println("\n--- Demo 1: Custom Label + Group Commit ---")
+ configWithLabel := &doris.Config{
+ Endpoints: []string{"http://localhost:8630"},
+ User: "root",
+ Password: "password",
+ Database: "test_db",
+ Table: "test_table",
+ Label: "my_custom_label_123", // User-specified custom
label
+ Format: doris.DefaultJSONFormat(),
+ Retry: doris.DefaultRetry(),
+ GroupCommit: doris.ASYNC, // Enable group commit
+ }
+
+ client1, err := doris.NewLoadClient(configWithLabel)
+ if err != nil {
+ fmt.Printf("Failed to create client: %v\n", err)
+ return
+ }
+
+ testData := `{"id": 1, "name": "test"}`
+ fmt.Println("Attempting to load data, observe label removal logs...")
+ _, err = client1.Load(strings.NewReader(testData))
+ if err != nil {
+ fmt.Printf("Expected connection error (test environment):
%v\n", err)
+ }
+
+ // Demo 2: Label Prefix + Group Commit
+ fmt.Println("\n--- Demo 2: Label Prefix + Group Commit ---")
+ configWithPrefix := &doris.Config{
+ Endpoints: []string{"http://localhost:8630"},
+ User: "root",
+ Password: "password",
+ Database: "test_db",
+ Table: "test_table",
+ LabelPrefix: "batch_load", // User-specified label prefix
+ Format: doris.DefaultCSVFormat(),
+ Retry: doris.DefaultRetry(),
+ GroupCommit: doris.SYNC, // Enable group commit (SYNC mode)
+ }
+
+ client2, err := doris.NewLoadClient(configWithPrefix)
+ if err != nil {
+ fmt.Printf("Failed to create client: %v\n", err)
+ return
+ }
+
+ csvData := "1,Alice,30\n2,Bob,25"
+ fmt.Println("Attempting to load data, observe label prefix removal
logs...")
+ _, err = client2.Load(strings.NewReader(csvData))
+ if err != nil {
+ fmt.Printf("Expected connection error (test environment):
%v\n", err)
+ }
+
+ // Demo 3: Both Label and LabelPrefix + Group Commit
+ fmt.Println("\n--- Demo 3: Label + Label Prefix + Group Commit ---")
+ configWithBoth := &doris.Config{
+ Endpoints: []string{"http://localhost:8630"},
+ User: "root",
+ Password: "password",
+ Database: "test_db",
+ Table: "test_table",
+ Label: "specific_job_001", // Custom label
+ LabelPrefix: "production", // Label prefix
+ Format: doris.DefaultJSONFormat(),
+ Retry: doris.DefaultRetry(),
+ GroupCommit: doris.ASYNC, // Enable group commit
+ }
+
+ client3, err := doris.NewLoadClient(configWithBoth)
+ if err != nil {
+ fmt.Printf("Failed to create client: %v\n", err)
+ return
+ }
+
+ jsonData := `{"id": 3, "name": "Charlie"}`
+ fmt.Println("Attempting to load data, observe removal logs for both
label configurations...")
+ _, err = client3.Load(strings.NewReader(jsonData))
+ if err != nil {
+ fmt.Printf("Expected connection error (test environment):
%v\n", err)
+ }
+
+ // Demo 4: Normal case without Group Commit
+ fmt.Println("\n--- Demo 4: Normal Case (Group Commit OFF) ---")
+ configNormal := &doris.Config{
+ Endpoints: []string{"http://localhost:8630"},
+ User: "root",
+ Password: "password",
+ Database: "test_db",
+ Table: "test_table",
+ Label: "normal_label_456",
+ LabelPrefix: "normal_prefix",
+ Format: doris.DefaultJSONFormat(),
+ Retry: doris.DefaultRetry(),
+ GroupCommit: doris.OFF, // Group commit disabled
+ }
+
+ client4, err := doris.NewLoadClient(configNormal)
+ if err != nil {
+ fmt.Printf("Failed to create client: %v\n", err)
+ return
+ }
+
+ fmt.Println("Attempting to load data, observe normal label generation
logs...")
+ _, err = client4.Load(strings.NewReader(testData))
+ if err != nil {
+ fmt.Printf("Expected connection error (test environment):
%v\n", err)
+ }
+
+ fmt.Println("\n=== Demo Complete ===")
+ fmt.Println("๐ก Note: The above demonstrated the label removal logging
when group commit is enabled")
+ fmt.Println("๐ Log level descriptions:")
+ fmt.Println(" - WARN: Warning when user-configured label/label_prefix
is removed")
+ fmt.Println(" - INFO: Compliance removal operation when group commit
is enabled")
+ fmt.Println(" - DEBUG: Normal label generation process")
+}
diff --git a/sdk/go-doris-sdk/examples/production_concurrent_example.go
b/sdk/go-doris-sdk/examples/production_concurrent_example.go
new file mode 100644
index 00000000000..5b021d61bed
--- /dev/null
+++ b/sdk/go-doris-sdk/examples/production_concurrent_example.go
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package examples demonstrates production-level concurrent large-scale data
loading
+// This example shows how to efficiently load 1 million records using 10
concurrent workers
+// Each worker loads 100,000 records independently for maximum throughput
+// Best practices: worker pools, progress monitoring, comprehensive error
handling
+// Uses unified orders schema for consistency across all examples
+package examples
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ doris "github.com/apache/doris/sdk/go-doris-sdk"
+)
+
+const (
+ // Production-level concurrent configuration
+ TOTAL_RECORDS = 1000000 // 1 million records
+ NUM_WORKERS = 10 // 10 concurrent
workers
+ RECORDS_PER_WORKER = TOTAL_RECORDS / NUM_WORKERS // 100k records per
worker
+)
+
+// WorkerStats holds statistics for each worker
+type WorkerStats struct {
+ WorkerID int
+ RecordsLoaded int
+ DataSize int64
+ LoadTime time.Duration
+ Success bool
+ Error error
+}
+
+// GlobalStats holds overall statistics with atomic operations for thread
safety
+type GlobalStats struct {
+ TotalRecordsProcessed int64
+ TotalDataSize int64
+ SuccessfulWorkers int64
+ FailedWorkers int64
+}
+
+// RunConcurrentExample demonstrates production-level concurrent large-scale
data loading
+func RunConcurrentExample() {
+ fmt.Println("=== Production-Level Concurrent Large-Scale Loading Demo
===")
+
+ fmt.Printf("๐ Scale: %d total records, %d workers, %d records per
worker\n",
+ TOTAL_RECORDS, NUM_WORKERS, RECORDS_PER_WORKER)
+
+ // Production-level configuration optimized for concurrent loads
+ config := &doris.Config{
+ Endpoints: []string{"http://10.16.10.6:8630"},
+ User: "root",
+ Password: "",
+ Database: "test",
+ Table: "orders", // Unified orders table
+ LabelPrefix: "prod_concurrent",
+ Format: doris.DefaultCSVFormat(), // Default CSV format
+ Retry: doris.NewRetry(5, 1000), // 5 retries with 1s
base interval
+ GroupCommit: doris.ASYNC, // ASYNC mode for
maximum throughput
+ }
+
+ // Create shared client (thread-safe)
+ client, err := doris.NewLoadClient(config)
+ if err != nil {
+ fmt.Printf("Failed to create load client: %v\n", err)
+ return
+ }
+
+ fmt.Println("โ
Load client created successfully")
+
+ // Initialize global statistics and synchronization
+ var globalStats GlobalStats
+ var wg sync.WaitGroup
+ resultChan := make(chan WorkerStats, NUM_WORKERS)
+ progressDone := make(chan bool)
+
+ // Start progress monitor
+ go printProgressMonitor(progressDone, &globalStats)
+
+ // Record overall start time
+ overallStart := time.Now()
+
+ // Launch concurrent workers
+ fmt.Printf("๐ Launching %d concurrent workers...\n", NUM_WORKERS)
+ for i := 0; i < NUM_WORKERS; i++ {
+ wg.Add(1)
+ go loadWorker(i, client, &globalStats, &wg, resultChan)
+
+ // Small delay between worker starts to stagger the load
+ time.Sleep(100 * time.Millisecond)
+ }
+
+ // Wait for all workers to complete
+ wg.Wait()
+ progressDone <- true
+ close(resultChan)
+
+ // Calculate overall metrics
+ overallTime := time.Since(overallStart)
+
+ // Collect and analyze results
+ var workerResults []WorkerStats
+ for stats := range resultChan {
+ workerResults = append(workerResults, stats)
+ }
+
+ // Simple results summary
+ fmt.Println("\n๐ === CONCURRENT LOAD COMPLETE ===")
+ fmt.Printf("๐ Total records processed: %d/%d\n",
atomic.LoadInt64(&globalStats.TotalRecordsProcessed), TOTAL_RECORDS)
+ fmt.Printf("๐ Workers: %d successful, %d failed\n",
atomic.LoadInt64(&globalStats.SuccessfulWorkers),
atomic.LoadInt64(&globalStats.FailedWorkers))
+ fmt.Printf("โฑ๏ธ Total time: %v\n", overallTime)
+ fmt.Printf("๐ Overall rate: %.0f records/sec\n",
float64(atomic.LoadInt64(&globalStats.TotalRecordsProcessed))/overallTime.Seconds())
+ fmt.Printf("๐พ Data processed: %.1f MB\n",
float64(atomic.LoadInt64(&globalStats.TotalDataSize))/1024/1024)
+
+ fmt.Println("=== Demo Complete ===")
+}
+
+// loadWorker performs the actual data loading for a single worker using
unified data generator
+func loadWorker(workerID int, client *doris.DorisLoadClient, globalStats
*GlobalStats, wg *sync.WaitGroup, resultChan chan<- WorkerStats) {
+ defer wg.Done()
+
+ stats := WorkerStats{
+ WorkerID: workerID,
+ Success: false,
+ }
+
+ fmt.Printf("Starting load operation for %d records\n",
RECORDS_PER_WORKER)
+ overallStart := time.Now()
+
+ // Generate data for this worker using unified data generator
+ genConfig := DataGeneratorConfig{
+ WorkerID: workerID,
+ BatchSize: RECORDS_PER_WORKER,
+ ContextName: fmt.Sprintf("DataGen-W%d", workerID),
+ }
+ data := GenerateOrderCSV(genConfig)
+ stats.DataSize = int64(len(data))
+
+ // Perform the load operation
+ fmt.Println("Starting load operation...")
+ loadStart := time.Now()
+
+ response, err := client.Load(doris.StringReader(data))
+ stats.LoadTime = time.Since(loadStart)
+
+ // Simple response handling
+ if err != nil {
+ stats.Error = err
+ fmt.Printf("โ Worker-%d failed: %v\n", workerID, err)
+ atomic.AddInt64(&globalStats.FailedWorkers, 1)
+ } else if response != nil && response.Status == doris.SUCCESS {
+ stats.Success = true
+ stats.RecordsLoaded = RECORDS_PER_WORKER
+
+ // Update global statistics atomically
+ atomic.AddInt64(&globalStats.TotalRecordsProcessed,
int64(RECORDS_PER_WORKER))
+ atomic.AddInt64(&globalStats.TotalDataSize, stats.DataSize)
+ atomic.AddInt64(&globalStats.SuccessfulWorkers, 1)
+
+ fmt.Printf("โ
Worker-%d completed: %d records in %v\n",
workerID, RECORDS_PER_WORKER, stats.LoadTime)
+ } else {
+ if response != nil {
+ stats.Error = fmt.Errorf("load failed with status: %v",
response.Status)
+ fmt.Printf("โ Worker-%d failed with status: %v\n",
workerID, response.Status)
+ } else {
+ stats.Error = fmt.Errorf("load failed: no response
received")
+ fmt.Printf("โ Worker-%d failed: no response\n",
workerID)
+ }
+ atomic.AddInt64(&globalStats.FailedWorkers, 1)
+ }
+
+ totalTime := time.Since(overallStart)
+ fmt.Printf("Worker completed in %v (load: %v)\n", totalTime,
stats.LoadTime)
+
+ resultChan <- stats
+}
+
+// printProgressMonitor monitors and prints progress during concurrent loading
+func printProgressMonitor(done chan bool, globalStats *GlobalStats) {
+ ticker := time.NewTicker(5 * time.Second)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-done:
+ return
+ case <-ticker.C:
+ processed :=
atomic.LoadInt64(&globalStats.TotalRecordsProcessed)
+ successful :=
atomic.LoadInt64(&globalStats.SuccessfulWorkers)
+ failed := atomic.LoadInt64(&globalStats.FailedWorkers)
+ progress := float64(processed) / float64(TOTAL_RECORDS)
* 100
+
+ fmt.Printf("๐ Progress: %.1f%% (%d/%d records),
Workers: %d success, %d failed\n",
+ progress, processed, TOTAL_RECORDS, successful,
failed)
+ }
+ }
+}
diff --git a/sdk/go-doris-sdk/examples/production_json_example.go
b/sdk/go-doris-sdk/examples/production_json_example.go
new file mode 100644
index 00000000000..2ece9793d5b
--- /dev/null
+++ b/sdk/go-doris-sdk/examples/production_json_example.go
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package examples demonstrates production-level JSON data loading
+// This example shows how to efficiently load large amounts of structured JSON
data (50,000 records)
+// Best practices: JSON optimization, structured data, memory efficiency
+// Uses unified orders schema for consistency across all examples
+package examples
+
+import (
+ "fmt"
+ "time"
+
+ doris "github.com/apache/doris/sdk/go-doris-sdk"
+)
+
+const (
+ // Production-level JSON batch size
+ JSON_BATCH_SIZE = 50000 // 50k JSON records
+)
+
+// RunJSONExample demonstrates production-level JSON data loading
+func RunJSONExample() {
+ fmt.Println("=== Production-Level JSON Data Loading Demo ===")
+
+ doris.SetLogLevel(doris.LogLevelInfo)
+
+ logger := doris.NewContextLogger("JSONDemo")
+ logger.Infof("Starting JSON loading demo with %d order records",
JSON_BATCH_SIZE)
+
+ // Production-level JSON configuration using direct struct construction
+ config := &doris.Config{
+ Endpoints: []string{"http://10.16.10.6:8630"},
+ User: "root",
+ Password: "",
+ Database: "test",
+ Table: "orders", // Unified orders table
+ LabelPrefix: "prod_json",
+ Format: &doris.JSONFormat{Type: doris.JSONObjectLine}, //
JSON Lines format
+ Retry: doris.NewRetry(3, 2000), //
Custom retry: 3 attempts, 2s base interval
+ GroupCommit: doris.ASYNC, //
ASYNC mode for better JSON processing
+ }
+
+ // Create client with automatic validation
+ client, err := doris.NewLoadClient(config)
+ if err != nil {
+ logger.Errorf("Failed to create load client: %v", err)
+ return
+ }
+
+ logger.Infof("โ
JSON load client created successfully")
+
+ // Generate realistic JSON order data using unified data generator
+ genConfig := DataGeneratorConfig{
+ BatchSize: JSON_BATCH_SIZE,
+ ContextName: "JSON-DataGen",
+ }
+ jsonData := GenerateOrderJSON(genConfig)
+
+ // Perform the JSON load operation
+ logger.Infof("Starting JSON load operation for %d order records...",
JSON_BATCH_SIZE)
+ loadStart := time.Now()
+
+ response, err := client.Load(doris.StringReader(jsonData))
+
+ loadTime := time.Since(loadStart)
+
+ // Simple response handling
+ if err != nil {
+ fmt.Printf("โ JSON load failed: %v\n", err)
+ return
+ }
+
+ if response != nil && response.Status == doris.SUCCESS {
+ fmt.Printf("๐ JSON load completed successfully!\n")
+ fmt.Printf("๐ JSON Records: %d, Size: %.1f MB, Time: %v\n",
JSON_BATCH_SIZE, float64(len(jsonData))/1024/1024, loadTime)
+ fmt.Printf("๐ JSON Rate: %.0f records/sec, %.1f MB/sec\n",
float64(JSON_BATCH_SIZE)/loadTime.Seconds(),
float64(len(jsonData))/1024/1024/loadTime.Seconds())
+ if response.Resp.Label != "" {
+ fmt.Printf("๐ Label: %s, Loaded: %d rows\n",
response.Resp.Label, response.Resp.NumberLoadedRows)
+ if response.Resp.LoadBytes > 0 {
+ avgBytesPerRecord :=
float64(response.Resp.LoadBytes) / float64(response.Resp.NumberLoadedRows)
+ fmt.Printf("๐ Average bytes per JSON record:
%.1f\n", avgBytesPerRecord)
+ }
+ }
+ } else {
+ fmt.Printf("โ JSON load failed with status: %v\n",
response.Status)
+ }
+
+ fmt.Println("=== JSON Demo Complete ===")
+}
diff --git a/sdk/go-doris-sdk/examples/production_single_batch_example.go
b/sdk/go-doris-sdk/examples/production_single_batch_example.go
new file mode 100644
index 00000000000..4dd69efb1fb
--- /dev/null
+++ b/sdk/go-doris-sdk/examples/production_single_batch_example.go
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package examples demonstrates production-level single-threaded large batch
loading
+// This example shows how to efficiently load large amounts of data (100,000
records)
+// Best practices: batch size optimization, memory efficiency, proper error
handling
+// Uses unified orders schema for consistency across all examples
+package examples
+
+import (
+ "fmt"
+ "time"
+
+ doris "github.com/apache/doris/sdk/go-doris-sdk"
+)
+
+const (
+ // Production-level batch size - recommended for optimal performance
+ BATCH_SIZE = 100000 // 100k records
+)
+
+// RunSingleBatchExample demonstrates production-level single-threaded large
batch loading
+func RunSingleBatchExample() {
+ fmt.Println("=== Production-Level Large Batch Loading Demo ===")
+
+ // Production logging level
+ doris.SetLogLevel(doris.LogLevelInfo)
+
+ logger := doris.NewContextLogger("SingleBatch")
+ logger.Infof("Starting large batch loading demo with %d order records",
BATCH_SIZE)
+
+ // Production-level configuration using direct struct construction
+ config := &doris.Config{
+ Endpoints: []string{"http://10.16.10.6:8630"},
+ User: "root",
+ Password: "",
+ Database: "test",
+ Table: "orders", // Unified orders table
+ LabelPrefix: "prod_batch",
+ Format: doris.DefaultCSVFormat(), // Default CSV format
+ Retry: doris.NewRetry(3, 2000), // 3 retries with 2s
base interval
+ GroupCommit: doris.ASYNC, // ASYNC mode for better
performance
+ }
+
+ // Create client with automatic validation
+ client, err := doris.NewLoadClient(config)
+ if err != nil {
+ logger.Errorf("Failed to create load client: %v", err)
+ return
+ }
+
+ logger.Infof("โ
Load client created successfully")
+
+ // Generate large batch of realistic order data using unified data
generator
+ genConfig := DataGeneratorConfig{
+ BatchSize: BATCH_SIZE,
+ ContextName: "SingleBatch-DataGen",
+ }
+ data := GenerateOrderCSV(genConfig)
+
+ // Perform the load operation
+ logger.Infof("Starting load operation for %d order records...",
BATCH_SIZE)
+ loadStart := time.Now()
+
+ response, err := client.Load(doris.StringReader(data))
+
+ loadTime := time.Since(loadStart)
+
+ // Simple response handling
+ if err != nil {
+ fmt.Printf("โ Load failed: %v\n", err)
+ return
+ }
+
+ if response != nil && response.Status == doris.SUCCESS {
+ fmt.Printf("๐ Load completed successfully!\n")
+ fmt.Printf("๐ Records: %d, Size: %.1f MB, Time: %v\n",
BATCH_SIZE, float64(len(data))/1024/1024, loadTime)
+ fmt.Printf("๐ Rate: %.0f records/sec, %.1f MB/sec\n",
float64(BATCH_SIZE)/loadTime.Seconds(),
float64(len(data))/1024/1024/loadTime.Seconds())
+ if response.Resp.Label != "" {
+ fmt.Printf("๐ Label: %s, Loaded: %d rows\n",
response.Resp.Label, response.Resp.NumberLoadedRows)
+ }
+ } else {
+ fmt.Printf("โ Load failed with status: %v\n", response.Status)
+ }
+
+ fmt.Println("=== Demo Complete ===")
+}
diff --git a/sdk/go-doris-sdk/examples/simple_config_example.go
b/sdk/go-doris-sdk/examples/simple_config_example.go
new file mode 100644
index 00000000000..3d75bf6f3e6
--- /dev/null
+++ b/sdk/go-doris-sdk/examples/simple_config_example.go
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package examples
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/apache/doris/sdk/go-doris-sdk"
+)
+
+func SimpleConfigExample() {
+ // Directly construct Config struct using new default functions
+ config := &doris.Config{
+ Endpoints: []string{"http://10.16.10.6:8630"},
+ User: "root",
+ Password: "password",
+ Database: "test_db",
+ Table: "test_table",
+ Format: doris.DefaultJSONFormat(), // Use new default JSON
format
+ Retry: doris.DefaultRetry(), // Use new default
retry strategy
+ GroupCommit: doris.ASYNC,
+ Options: map[string]string{
+ "strict_mode": "true",
+ "max_filter_ratio": "0.1",
+ },
+ }
+
+ // Create client
+ client, err := doris.NewLoadClient(config)
+ if err != nil {
+ fmt.Printf("Failed to create client: %v\n", err)
+ return
+ }
+
+ // Prepare data
+ jsonData := `{"id": 1, "name": "Alice", "age": 30}
+{"id": 2, "name": "Bob", "age": 25}
+{"id": 3, "name": "Charlie", "age": 35}`
+
+ // Execute load
+ response, err := client.Load(strings.NewReader(jsonData))
+ if err != nil {
+ fmt.Printf("Load failed: %v\n", err)
+ return
+ }
+
+ fmt.Printf("Load completed successfully!\n")
+ fmt.Printf("Status: %s\n", response.Status)
+ if response.Status == doris.SUCCESS {
+ fmt.Printf("Loaded rows: %d\n", response.Resp.NumberLoadedRows)
+ fmt.Printf("Load bytes: %d\n", response.Resp.LoadBytes)
+ }
+}
diff --git a/sdk/go-doris-sdk/go.mod b/sdk/go-doris-sdk/go.mod
new file mode 100644
index 00000000000..ea5c6219cde
--- /dev/null
+++ b/sdk/go-doris-sdk/go.mod
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+module github.com/apache/doris/sdk/go-doris-sdk
+
+go 1.20
+
+require (
+ github.com/google/uuid v1.4.0
+ github.com/json-iterator/go v1.1.12
+)
+
+require (
+ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 //
indirect
+ github.com/modern-go/reflect2 v1.0.2 // indirect
+)
diff --git a/sdk/go-doris-sdk/go.sum b/sdk/go-doris-sdk/go.sum
new file mode 100644
index 00000000000..b8a68bf6a0c
--- /dev/null
+++ b/sdk/go-doris-sdk/go.sum
@@ -0,0 +1,17 @@
+github.com/davecgh/go-spew v1.1.0/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/google/gofuzz v1.0.0/go.mod
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
+github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
+github.com/google/uuid v1.4.0/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/json-iterator/go v1.1.12
h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
+github.com/json-iterator/go v1.1.12/go.mod
h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421
h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v1.0.2
h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
+github.com/modern-go/reflect2 v1.0.2/go.mod
h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
+github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0
h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
diff --git a/sdk/go-doris-sdk/pkg/load/client/doris_load_client.go
b/sdk/go-doris-sdk/pkg/load/client/doris_load_client.go
new file mode 100644
index 00000000000..7d231827fb9
--- /dev/null
+++ b/sdk/go-doris-sdk/pkg/load/client/doris_load_client.go
@@ -0,0 +1,327 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package client provides the main client interface for Doris Stream Load
+package client
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "net"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/apache/doris/sdk/go-doris-sdk/pkg/load/config"
+ loader "github.com/apache/doris/sdk/go-doris-sdk/pkg/load/loader"
+ "github.com/apache/doris/sdk/go-doris-sdk/pkg/load/log"
+)
+
+// Pre-compiled error patterns for efficient matching
+var (
+ retryableErrorPatterns = []string{
+ "connection refused",
+ "connection reset",
+ "connection timeout",
+ "timeout",
+ "network is unreachable",
+ "no such host",
+ "temporary failure",
+ "dial tcp",
+ "i/o timeout",
+ "eof",
+ "broken pipe",
+ "connection aborted",
+ "307 temporary redirect",
+ "302 found",
+ "301 moved permanently",
+ }
+
+ retryableResponsePatterns = []string{
+ "connect",
+ "unavailable",
+ "timeout",
+ "redirect",
+ }
+
+ // Pool for string builders to reduce allocations
+ stringBuilderPool = sync.Pool{
+ New: func() interface{} {
+ return &strings.Builder{}
+ },
+ }
+)
+
+// DorisLoadClient is the main client interface for loading data into Doris
+type DorisLoadClient struct {
+ streamLoader *loader.StreamLoader
+ config *config.Config
+}
+
+// NewDorisClient creates a new DorisLoadClient instance with the given
configuration
+func NewDorisClient(cfg *config.Config) (*DorisLoadClient, error) {
+ // Validate the configuration
+ if err := cfg.ValidateInternal(); err != nil {
+ return nil, fmt.Errorf("invalid configuration: %w", err)
+ }
+
+ return &DorisLoadClient{
+ streamLoader: loader.NewStreamLoader(),
+ config: cfg,
+ }, nil
+}
+
+// isRetryableError determines if an error should trigger a retry
+// Only network/connection issues should be retried
+// Optimized to reduce memory allocations
+func isRetryableError(err error, response *loader.LoadResponse) bool {
+ if err != nil {
+ // Avoid ToLower allocation by checking original error first
+ errStr := err.Error()
+
+ // Check net.Error interface first (most efficient)
+ if netErr, ok := err.(net.Error); ok {
+ if netErr.Timeout() || netErr.Temporary() {
+ return true
+ }
+ }
+
+ // Only convert to lowercase if necessary
+ errStrLower := strings.ToLower(errStr)
+ for _, pattern := range retryableErrorPatterns {
+ if strings.Contains(errStrLower, pattern) {
+ return true
+ }
+ }
+
+ return false
+ }
+
+ // If no error but response indicates failure, check if it's a
retryable response error
+ if response != nil && response.Status == loader.FAILURE &&
response.ErrorMessage != "" {
+ errMsgLower := strings.ToLower(response.ErrorMessage)
+ for _, pattern := range retryableResponsePatterns {
+ if strings.Contains(errMsgLower, pattern) {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+// calculateBackoffInterval calculates exponential backoff interval with
dynamic maximum
+// The maximum interval is constrained to ensure total retry time stays within
limits
+func calculateBackoffInterval(attempt int, baseIntervalMs int64,
maxTotalTimeMs int64, currentRetryTimeMs int64) time.Duration {
+ if attempt <= 0 {
+ return 0
+ }
+
+ // Calculate exponential backoff: baseInterval * 2^(attempt-1)
+ multiplier := int64(1 << (attempt - 1)) // 2^(attempt-1)
+ intervalMs := baseIntervalMs * multiplier
+
+ // If there's a total time limit, constrain the interval dynamically
+ if maxTotalTimeMs > 0 {
+ remainingTimeMs := maxTotalTimeMs - currentRetryTimeMs
+ // Reserve some time for the actual request (estimate ~5
seconds)
+ maxAllowedIntervalMs := remainingTimeMs - 5000
+
+ if maxAllowedIntervalMs > 0 && intervalMs >
maxAllowedIntervalMs {
+ intervalMs = maxAllowedIntervalMs
+ }
+ }
+
+ // Also apply a reasonable absolute maximum (e.g., 5 minutes) to
prevent extreme cases
+ const absoluteMaxIntervalMs = 300000 // 5 minutes
+ if intervalMs > absoluteMaxIntervalMs {
+ intervalMs = absoluteMaxIntervalMs
+ }
+
+ // Ensure interval is non-negative
+ if intervalMs < 0 {
+ intervalMs = 0
+ }
+
+ return time.Duration(intervalMs) * time.Millisecond
+}
+
+// Load sends data to Doris via HTTP stream load with retry logic
+func (c *DorisLoadClient) Load(reader io.Reader) (*loader.LoadResponse, error)
{
+ operationStartTime := time.Now()
+
+ // Step 1: Configuration preparation
+ retry := c.config.Retry
+ if retry == nil {
+ retry = &config.Retry{MaxRetryTimes: 6, BaseIntervalMs: 1000,
MaxTotalTimeMs: 60000}
+ }
+
+ maxRetries := retry.MaxRetryTimes
+ baseIntervalMs := retry.BaseIntervalMs
+ maxTotalTimeMs := retry.MaxTotalTimeMs
+
+ log.Infof("Starting stream load operation")
+ log.Infof("Target: %s.%s", c.config.Database, c.config.Table)
+
+ // Show the actual retry strategy to avoid confusion
+ if maxRetries > 0 {
+ // Calculate and show the actual retry intervals
+ var intervals []string
+ totalTimeMs := int64(0)
+ for i := 1; i <= maxRetries; i++ {
+ // Calculate what the interval would be at this point
+ simulatedInterval := calculateBackoffInterval(i,
baseIntervalMs, maxTotalTimeMs, totalTimeMs)
+ intervalMs := simulatedInterval.Milliseconds()
+ intervals = append(intervals, fmt.Sprintf("%dms",
intervalMs))
+ totalTimeMs += intervalMs
+ }
+ log.Debugf("Retry strategy: exponential backoff with max %d
attempts, intervals: [%s], estimated max time: %dms (limit: %dms)",
+ maxRetries, strings.Join(intervals, ", "), totalTimeMs,
maxTotalTimeMs)
+ } else {
+ log.Debugf("Retry disabled (maxRetries=0)")
+ }
+
+ // Prepare for retries by handling reader consumption
+ var getBodyFunc func() (io.Reader, error)
+
+ // Check if reader supports seeking
+ if seeker, ok := reader.(io.Seeker); ok {
+ // Reader supports seeking, we can reuse it
+ getBodyFunc = func() (io.Reader, error) {
+ if _, err := seeker.Seek(0, io.SeekStart); err != nil {
+ return nil, fmt.Errorf("failed to seek to
start: %w", err)
+ }
+ return reader, nil
+ }
+ } else {
+ // Reader doesn't support seeking, buffer the content
+ var buf bytes.Buffer
+ if _, err := buf.ReadFrom(reader); err != nil {
+ return nil, fmt.Errorf("failed to buffer reader
content: %w", err)
+ }
+
+ getBodyFunc = func() (io.Reader, error) {
+ // Return a copy of the buffer so it's not consumed
+ return bytes.NewReader(buf.Bytes()), nil
+ }
+ }
+
+ var lastErr error
+ var response *loader.LoadResponse
+ startTime := time.Now()
+ totalRetryTime := int64(0)
+
+ // Try the operation with retries
+ for attempt := 0; attempt <= maxRetries; attempt++ {
+ if attempt > 0 {
+ log.Infof("Retry attempt %d/%d", attempt, maxRetries)
+ } else {
+ log.Infof("Initial load attempt")
+ }
+
+ // Calculate and apply backoff delay for retries
+ if attempt > 0 {
+ backoffInterval := calculateBackoffInterval(attempt,
baseIntervalMs, maxTotalTimeMs, totalRetryTime)
+
+ // Check if this delay would exceed the total time limit
+ if maxTotalTimeMs > 0 &&
totalRetryTime+backoffInterval.Milliseconds() > maxTotalTimeMs {
+ log.Warnf("Next retry delay (%v) would exceed
total time limit (%dms). Current total retry time: %dms. Stopping retries.",
+ backoffInterval, maxTotalTimeMs,
totalRetryTime)
+ break
+ }
+
+ log.Infof("Waiting %v before retry attempt (total retry
time so far: %dms)", backoffInterval, totalRetryTime)
+ time.Sleep(backoffInterval)
+ totalRetryTime += backoffInterval.Milliseconds()
+ }
+
+ // Get a fresh reader for this attempt
+ currentReader, err := getBodyFunc()
+ if err != nil {
+ log.Errorf("Failed to get reader for attempt %d: %v",
attempt+1, err)
+ lastErr = fmt.Errorf("failed to get reader: %w", err)
+ break
+ }
+
+ // Create the HTTP request
+ req, err := loader.CreateStreamLoadRequest(c.config,
currentReader, attempt)
+ if err != nil {
+ log.Errorf("Failed to create HTTP request: %v", err)
+ lastErr = fmt.Errorf("failed to create request: %w",
err)
+ // Request creation failure is usually not retryable
(config issue)
+ break
+ }
+
+ // Execute the actual load operation
+ response, lastErr = c.streamLoader.Load(req)
+
+ // If successful, return immediately
+ if lastErr == nil && response != nil && response.Status ==
loader.SUCCESS {
+ log.Infof("Stream load operation completed successfully
on attempt %d", attempt+1)
+ return response, nil
+ }
+
+ // Check if this error/response should be retried
+ shouldRetry := isRetryableError(lastErr, response)
+
+ if lastErr != nil {
+ log.Errorf("Attempt %d failed with error: %v
(retryable: %t)", attempt+1, lastErr, shouldRetry)
+ } else if response != nil && response.Status == loader.FAILURE {
+ log.Errorf("Attempt %d failed with status: %s
(retryable: %t)", attempt+1, response.Resp.Status, shouldRetry)
+ if response.ErrorMessage != "" {
+ log.Errorf("Error details: %s",
response.ErrorMessage)
+ }
+ }
+
+ // Early exit for non-retryable errors
+ if !shouldRetry {
+ log.Warnf("Error is not retryable, stopping retry
attempts")
+ break
+ }
+
+ // If this is the last attempt, don't continue
+ if attempt == maxRetries {
+ log.Warnf("Reached maximum retry attempts (%d),
stopping", maxRetries)
+ break
+ }
+
+ // Check total elapsed time (including processing time, not
just retry delays)
+ elapsedTime := time.Since(startTime)
+ if maxTotalTimeMs > 0 && elapsedTime.Milliseconds() >
maxTotalTimeMs {
+ log.Warnf("Total operation time (%v) exceeded limit
(%dms), stopping retries", elapsedTime, maxTotalTimeMs)
+ break
+ }
+ }
+
+ // Final result logging
+ totalOperationTime := time.Since(operationStartTime)
+ log.Debugf("[TIMING] Total operation time: %v", totalOperationTime)
+
+ if lastErr != nil {
+ log.Errorf("Stream load operation failed after %d attempts:
%v", maxRetries+1, lastErr)
+ return response, lastErr
+ }
+
+ if response != nil {
+ log.Errorf("Stream load operation failed with final status:
%v", response.Status)
+ return response, fmt.Errorf("load failed with status: %v",
response.Status)
+ }
+
+ log.Errorf("Stream load operation failed with unknown error after %d
attempts (total time: %v)", maxRetries+1)
+ return nil, fmt.Errorf("load failed: unknown error")
+}
diff --git a/sdk/go-doris-sdk/pkg/load/config/load_config.go
b/sdk/go-doris-sdk/pkg/load/config/load_config.go
new file mode 100644
index 00000000000..60f5c3b3346
--- /dev/null
+++ b/sdk/go-doris-sdk/pkg/load/config/load_config.go
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+ "fmt"
+)
+
+// Format interface defines the data format for stream load
+type Format interface {
+ // GetFormatType returns the format type (json or csv)
+ GetFormatType() string
+ // GetOptions returns format-specific options as map for headers
+ GetOptions() map[string]string
+}
+
+// JSONFormatType defines JSON format subtypes
+type JSONFormatType string
+
+const (
+ JSONObjectLine JSONFormatType = "object_line" // JSON objects separated
by newlines
+ JSONArray JSONFormatType = "array" // JSON array
+)
+
+// JSONFormat represents JSON format configuration
+// Usage: &JSONFormat{Type: JSONObjectLine} or &JSONFormat{Type: JSONArray}
+type JSONFormat struct {
+ Type JSONFormatType
+}
+
+// GetFormatType implements Format interface
+func (f *JSONFormat) GetFormatType() string {
+ return "json"
+}
+
+// GetOptions implements Format interface - returns headers for JSON format
+func (f *JSONFormat) GetOptions() map[string]string {
+ options := make(map[string]string)
+ options["format"] = "json"
+
+ switch f.Type {
+ case JSONObjectLine:
+ options["strip_outer_array"] = "false"
+ options["read_json_by_line"] = "true"
+ case JSONArray:
+ options["strip_outer_array"] = "true"
+ }
+
+ return options
+}
+
+// CSVFormat represents CSV format configuration
+// Usage: &CSVFormat{ColumnSeparator: ",", LineDelimiter: "\n"}
+type CSVFormat struct {
+ ColumnSeparator string
+ LineDelimiter string
+}
+
+// GetFormatType implements Format interface
+func (f *CSVFormat) GetFormatType() string {
+ return "csv"
+}
+
+// GetOptions implements Format interface - returns headers for CSV format
+func (f *CSVFormat) GetOptions() map[string]string {
+ options := make(map[string]string)
+ options["format"] = "csv"
+ options["column_separator"] = f.ColumnSeparator
+ options["line_delimiter"] = f.LineDelimiter
+ return options
+}
+
+// GroupCommitMode defines the group commit mode
+type GroupCommitMode int
+
+const (
+ // SYNC represents synchronous group commit mode
+ SYNC GroupCommitMode = iota
+ // ASYNC represents asynchronous group commit mode
+ ASYNC
+ // OFF represents disabled group commit mode
+ OFF
+)
+
+// Retry contains configuration for retry attempts when loading data
+type Retry struct {
+ MaxRetryTimes int // Maximum number of retry attempts
+ BaseIntervalMs int64 // Base interval in milliseconds for exponential
backoff
+ MaxTotalTimeMs int64 // Maximum total time for all retries in
milliseconds
+}
+
+// Config contains all configuration for stream load operations
+type Config struct {
+ Endpoints []string
+ User string
+ Password string
+ Database string
+ Table string
+ LabelPrefix string
+ Label string
+ Format Format // Can be &JSONFormat{...} or &CSVFormat{...}
+ Retry *Retry
+ GroupCommit GroupCommitMode
+ Options map[string]string
+}
+
+// ValidateInternal validates the configuration
+func (c *Config) ValidateInternal() error {
+ if c.User == "" {
+ return fmt.Errorf("user cannot be empty")
+ }
+
+ if c.Database == "" {
+ return fmt.Errorf("database cannot be empty")
+ }
+
+ if c.Table == "" {
+ return fmt.Errorf("table cannot be empty")
+ }
+
+ if len(c.Endpoints) == 0 {
+ return fmt.Errorf("endpoints cannot be empty")
+ }
+
+ if c.Format == nil {
+ return fmt.Errorf("format cannot be nil")
+ }
+
+ if c.Retry != nil {
+ if c.Retry.MaxRetryTimes < 0 {
+ return fmt.Errorf("maxRetryTimes cannot be negative")
+ }
+ if c.Retry.BaseIntervalMs < 0 {
+ return fmt.Errorf("retryIntervalMs cannot be negative")
+ }
+ if c.Retry.MaxTotalTimeMs < 0 {
+ return fmt.Errorf("maxTotalTimeMs cannot be negative")
+ }
+ }
+
+ return nil
+}
diff --git a/sdk/go-doris-sdk/pkg/load/exception/stream_load_error.go
b/sdk/go-doris-sdk/pkg/load/exception/stream_load_error.go
new file mode 100644
index 00000000000..13b0dffabd2
--- /dev/null
+++ b/sdk/go-doris-sdk/pkg/load/exception/stream_load_error.go
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package exception provides error types used in the Doris Stream Load client
+package exception
+
+// StreamLoadError represents an error that occurred during a stream load
operation
+type StreamLoadError struct {
+ Message string
+}
+
+// Error returns the error message
+func (e *StreamLoadError) Error() string {
+ return e.Message
+}
+
+// NewStreamLoadError creates a new StreamLoadError with the given message
+func NewStreamLoadError(message string) *StreamLoadError {
+ return &StreamLoadError{
+ Message: message,
+ }
+}
diff --git a/sdk/go-doris-sdk/pkg/load/load.go
b/sdk/go-doris-sdk/pkg/load/load.go
new file mode 100644
index 00000000000..362c6379255
--- /dev/null
+++ b/sdk/go-doris-sdk/pkg/load/load.go
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package load provides the Doris Stream Load client functionality
+package load
+
+import (
+ "bytes"
+ "encoding/json"
+ "io"
+ "os"
+ "strings"
+
+ "github.com/apache/doris/sdk/go-doris-sdk/pkg/load/client"
+ "github.com/apache/doris/sdk/go-doris-sdk/pkg/load/config"
+ loader "github.com/apache/doris/sdk/go-doris-sdk/pkg/load/loader"
+ "github.com/apache/doris/sdk/go-doris-sdk/pkg/load/log"
+)
+
+// ================================
+// Public API Types
+// ================================
+
+// Config is the main configuration structure for stream load operations
+type Config = config.Config
+
+// DorisLoadClient provides functionality to load data into Doris using stream
load API
+type DorisLoadClient = client.DorisLoadClient
+
+// Format aliases
+type Format = config.Format
+type JSONFormatType = config.JSONFormatType
+type JSONFormat = config.JSONFormat
+type CSVFormat = config.CSVFormat
+
+// Config aliases (for backward compatibility)
+type LoadSetting = config.Config
+type BatchMode = config.GroupCommitMode
+type GroupCommitMode = config.GroupCommitMode
+type Retry = config.Retry
+
+// Log aliases
+type LogLevel = log.Level
+type LogFunc = log.LogFunc
+type ContextLogger = log.ContextLogger
+
+// Load aliases
+type LoadResponse = loader.LoadResponse
+type LoadStatus = loader.LoadStatus
+type RespContent = loader.RespContent
+
+// ================================
+// Constants
+// ================================
+
+const (
+ // JSON format constants
+ JSONObjectLine = config.JSONObjectLine
+ JSONArray = config.JSONArray
+
+ // Batch mode constants
+ SYNC = config.SYNC
+ ASYNC = config.ASYNC
+ OFF = config.OFF
+
+ // Load status constants
+ SUCCESS = loader.SUCCESS
+ FAILURE = loader.FAILURE
+
+ // Log level constants
+ LogLevelDebug = log.LevelDebug
+ LogLevelInfo = log.LevelInfo
+ LogLevelWarn = log.LevelWarn
+ LogLevelError = log.LevelError
+)
+
+// ================================
+// Client Creation Functions
+// ================================
+
+// NewLoadClient creates a new Doris stream load client with the given
configuration
+func NewLoadClient(cfg *Config) (*DorisLoadClient, error) {
+ return client.NewDorisClient(cfg)
+}
+
+// ================================
+// Retry Configuration
+// ================================
+
+// NewRetry creates a new retry configuration
+func NewRetry(maxRetryTimes int, baseIntervalMs int64) *Retry {
+ return &Retry{
+ MaxRetryTimes: maxRetryTimes,
+ BaseIntervalMs: baseIntervalMs,
+ MaxTotalTimeMs: 60000, // Default 60 seconds total
+ }
+}
+
+// DefaultRetry creates a new retry configuration with default values (6
retries, 1 second base interval, 60s total)
+// Uses exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s = ~63 seconds total
retry time
+func DefaultRetry() *Retry {
+ return &Retry{
+ MaxRetryTimes: 6, // Maximum 6 retries
+ BaseIntervalMs: 1000, // 1 second base interval
+ MaxTotalTimeMs: 60000, // 60 seconds total limit
+ }
+}
+
+// NewDefaultRetry creates a new retry configuration with default values (6
retries, 1 second base interval, 60s total)
+// Uses exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s = ~63 seconds total
retry time
+func NewDefaultRetry() *Retry {
+ return DefaultRetry()
+}
+
+// ================================
+// Format Configuration
+// ================================
+
+// DefaultJSONFormat creates a default JSON format configuration
(JSONObjectLine)
+func DefaultJSONFormat() *JSONFormat {
+ return &JSONFormat{Type: JSONObjectLine}
+}
+
+// DefaultCSVFormat creates a default CSV format configuration (comma
separator, newline delimiter)
+func DefaultCSVFormat() *CSVFormat {
+ return &CSVFormat{
+ ColumnSeparator: ",",
+ LineDelimiter: "\\n",
+ }
+}
+
+// ================================
+// Data Conversion Helpers
+// ================================
+
+// StringReader converts string data to io.Reader
+func StringReader(data string) io.Reader {
+ return strings.NewReader(data)
+}
+
+// BytesReader converts byte data to io.Reader
+func BytesReader(data []byte) io.Reader {
+ return bytes.NewReader(data)
+}
+
+// JSONReader converts any JSON-serializable object to io.Reader
+func JSONReader(data interface{}) (io.Reader, error) {
+ jsonBytes, err := json.Marshal(data)
+ if err != nil {
+ return nil, err
+ }
+ return strings.NewReader(string(jsonBytes)), nil
+}
+
+// ================================
+// Log Control Functions
+// ================================
+
+// SetLogLevel sets the minimum log level for the SDK
+// Available levels: LogLevelDebug, LogLevelInfo, LogLevelWarn, LogLevelError
+func SetLogLevel(level LogLevel) {
+ log.SetLevel(level)
+}
+
+// SetLogOutput sets the output destination for SDK logs
+func SetLogOutput(output *os.File) {
+ log.SetOutput(output)
+}
+
+// DisableLogging completely disables all SDK logging
+func DisableLogging() {
+ log.SetLevel(log.Level(999))
+}
+
+// SetCustomLogFunc allows users to integrate their own logging systems
+func SetCustomLogFunc(level LogLevel, fn LogFunc) {
+ switch level {
+ case log.LevelDebug:
+ log.SetDebugFunc(fn)
+ case log.LevelInfo:
+ log.SetInfoFunc(fn)
+ case log.LevelWarn:
+ log.SetWarnFunc(fn)
+ case log.LevelError:
+ log.SetErrorFunc(fn)
+ }
+}
+
+// SetCustomLogFuncs allows setting all log functions at once
+func SetCustomLogFuncs(debugFn, infoFn, warnFn, errorFn LogFunc) {
+ if debugFn != nil {
+ log.SetDebugFunc(debugFn)
+ }
+ if infoFn != nil {
+ log.SetInfoFunc(infoFn)
+ }
+ if warnFn != nil {
+ log.SetWarnFunc(warnFn)
+ }
+ if errorFn != nil {
+ log.SetErrorFunc(errorFn)
+ }
+}
+
+// NewContextLogger creates a context logger with the given context string
+func NewContextLogger(context string) *ContextLogger {
+ return log.NewContextLogger(context)
+}
diff --git a/sdk/go-doris-sdk/pkg/load/loader/request_builder.go
b/sdk/go-doris-sdk/pkg/load/loader/request_builder.go
new file mode 100644
index 00000000000..38833e6331f
--- /dev/null
+++ b/sdk/go-doris-sdk/pkg/load/loader/request_builder.go
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package load
+
+import (
+ "encoding/base64"
+ "fmt"
+ "io"
+ "math/rand"
+ "net/http"
+ "net/url"
+ "time"
+
+ "github.com/apache/doris/sdk/go-doris-sdk/pkg/load/config"
+ "github.com/apache/doris/sdk/go-doris-sdk/pkg/load/log"
+ "github.com/google/uuid"
+)
+
+const (
+ StreamLoadPattern = "http://%s/api/%s/%s/_stream_load"
+)
+
+// getNode randomly selects an endpoint and returns the parsed host
+func getNode(endpoints []string) (string, error) {
+ if len(endpoints) == 0 {
+ return "", fmt.Errorf("no endpoints available")
+ }
+
+ // Use global rand.Intn which is thread-safe in Go 1.0+
+ randomIndex := rand.Intn(len(endpoints))
+ endpoint := endpoints[randomIndex]
+
+ // Parse the endpoint URL to extract the host
+ endpointURL, err := url.Parse(endpoint)
+ if err != nil {
+ return "", fmt.Errorf("invalid endpoint URL: %v", err)
+ }
+
+ return endpointURL.Host, nil
+}
+
+// CreateStreamLoadRequest creates an HTTP PUT request for Doris stream load
+func CreateStreamLoadRequest(cfg *config.Config, data io.Reader, attempt int)
(*http.Request, error) {
+ // Get a random endpoint host
+ host, err := getNode(cfg.Endpoints)
+ if err != nil {
+ return nil, err
+ }
+
+ // Construct the load URL
+ loadURL := fmt.Sprintf(StreamLoadPattern, host, cfg.Database, cfg.Table)
+
+ // Create the HTTP PUT request
+ req, err := http.NewRequest(http.MethodPut, loadURL, data)
+ if err != nil {
+ return nil, err
+ }
+
+ // Add basic authentication
+ authInfo := fmt.Sprintf("%s:%s", cfg.User, cfg.Password)
+ encodedAuth := base64.StdEncoding.EncodeToString([]byte(authInfo))
+ req.Header.Set("Authorization", "Basic "+encodedAuth)
+
+ // Add common headers
+ req.Header.Set("Expect", "100-continue")
+
+ // Build and add all stream load options as headers
+ allOptions := buildStreamLoadOptions(cfg)
+ for key, value := range allOptions {
+ req.Header.Set(key, value)
+ }
+
+ // Handle label generation based on group commit usage
+ handleLabelForRequest(cfg, req, allOptions, attempt)
+
+ return req, nil
+}
+
+// handleLabelForRequest handles label generation and setting based on group
commit configuration
+func handleLabelForRequest(cfg *config.Config, req *http.Request, allOptions
map[string]string, attempt int) {
+ // Check if group commit is enabled
+ _, isGroupCommitEnabled := allOptions["group_commit"]
+
+ if isGroupCommitEnabled {
+ // Group commit is enabled, labels are not allowed
+ if cfg.Label != "" {
+ // User provided a custom label but group commit is
enabled
+ log.Warnf("Custom label '%s' specified but group_commit
is enabled. Labels are not allowed with group commit, removing label.",
cfg.Label)
+ }
+ if cfg.LabelPrefix != "" {
+ // User provided a label prefix but group commit is
enabled
+ log.Warnf("Label prefix '%s' specified but group_commit
is enabled. Labels are not allowed with group commit, removing label prefix.",
cfg.LabelPrefix)
+ }
+ // Log the removal action
+ log.Infof("Group commit is enabled - all labels removed from
request headers for compliance")
+ // Do not set any label when group commit is enabled
+ return
+ }
+
+ // Group commit is not enabled, generate and set label
+ label := generateLabel(cfg, attempt)
+ req.Header.Set("label", label)
+
+ if attempt > 0 {
+ log.Debugf("Generated retry label for attempt %d: %s", attempt,
label)
+ } else {
+ log.Debugf("Generated label: %s", label)
+ }
+}
+
+// buildStreamLoadOptions builds all stream load options from configuration
+func buildStreamLoadOptions(cfg *config.Config) map[string]string {
+ result := make(map[string]string)
+
+ // Add user-defined options first
+ for k, v := range cfg.Options {
+ result[k] = v
+ }
+
+ // Add format-specific options
+ if cfg.Format != nil {
+ for k, v := range cfg.Format.GetOptions() {
+ result[k] = v
+ }
+ }
+
+ // Add group commit options
+ switch cfg.GroupCommit {
+ case config.SYNC:
+ result["group_commit"] = "sync_mode"
+ case config.ASYNC:
+ result["group_commit"] = "async_mode"
+ case config.OFF:
+ // Don't add group_commit option
+ }
+
+ return result
+}
+
+// generateLabel creates a unique label for the load job, considering retry
attempts
+func generateLabel(cfg *config.Config, attempt int) string {
+ currentTimeMillis := time.Now().UnixMilli()
+ id := uuid.New()
+
+ // If user provided a custom label, handle retry scenarios
+ if cfg.Label != "" {
+ if attempt == 0 {
+ // First attempt: use the original label
+ return cfg.Label
+ } else {
+ // Retry attempts: append retry suffix to ensure
uniqueness
+ return fmt.Sprintf("%s_retry_%d_%d_%s", cfg.Label,
attempt, currentTimeMillis, id.String()[:8])
+ }
+ }
+
+ // Generate a unique label when no custom label is provided
+ prefix := cfg.LabelPrefix
+ if prefix == "" {
+ prefix = "load"
+ }
+
+ if attempt == 0 {
+ // First attempt
+ return fmt.Sprintf("%s_%s_%s_%d_%s", prefix, cfg.Database,
cfg.Table, currentTimeMillis, id.String())
+ } else {
+ // Retry attempts: include attempt number for uniqueness
+ return fmt.Sprintf("%s_%s_%s_%d_retry_%d_%s", prefix,
cfg.Database, cfg.Table, currentTimeMillis, attempt, id.String())
+ }
+}
diff --git a/sdk/go-doris-sdk/pkg/load/loader/resp_content.go
b/sdk/go-doris-sdk/pkg/load/loader/resp_content.go
new file mode 100644
index 00000000000..20379dd18dd
--- /dev/null
+++ b/sdk/go-doris-sdk/pkg/load/loader/resp_content.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package load
+
+import (
+ jsoniter "github.com/json-iterator/go"
+)
+
+type LoadResponse struct {
+ Status LoadStatus
+ Resp RespContent
+ ErrorMessage string
+}
+
+type LoadStatus int
+
+const (
+ FAILURE LoadStatus = iota
+ SUCCESS
+)
+
+// String returns the string representation of LoadStatus
+func (s LoadStatus) String() string {
+ switch s {
+ case SUCCESS:
+ return "SUCCESS"
+ case FAILURE:
+ return "FAILURE"
+ default:
+ return "UNKNOWN"
+ }
+}
+
+// RespContent represents the response from a stream load operation
+type RespContent struct {
+ TxnID int64 `json:"TxnId"`
+ Label string `json:"Label"`
+ Status string `json:"Status"`
+ TwoPhaseCommit string `json:"TwoPhaseCommit"`
+ ExistingJobStatus string `json:"ExistingJobStatus"`
+ Message string `json:"Message"`
+ NumberTotalRows int64 `json:"NumberTotalRows"`
+ NumberLoadedRows int64 `json:"NumberLoadedRows"`
+ NumberFilteredRows int `json:"NumberFilteredRows"`
+ NumberUnselectedRows int `json:"NumberUnselectedRows"`
+ LoadBytes int64 `json:"LoadBytes"`
+ LoadTimeMs int `json:"LoadTimeMs"`
+ BeginTxnTimeMs int `json:"BeginTxnTimeMs"`
+ StreamLoadPutTimeMs int `json:"StreamLoadPutTimeMs"`
+ ReadDataTimeMs int `json:"ReadDataTimeMs"`
+ WriteDataTimeMs int `json:"WriteDataTimeMs"`
+ CommitAndPublishTimeMs int `json:"CommitAndPublishTimeMs"`
+ ErrorURL string `json:"ErrorURL"`
+}
+
+// String returns a JSON representation of the response content
+func (r *RespContent) String() string {
+ json := jsoniter.ConfigCompatibleWithStandardLibrary
+ bytes, err := json.Marshal(r)
+ if err != nil {
+ return ""
+ }
+ return string(bytes)
+}
diff --git a/sdk/go-doris-sdk/pkg/load/loader/stream_loader.go
b/sdk/go-doris-sdk/pkg/load/loader/stream_loader.go
new file mode 100644
index 00000000000..9d224dc0930
--- /dev/null
+++ b/sdk/go-doris-sdk/pkg/load/loader/stream_loader.go
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package load
+
+import (
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/apache/doris/sdk/go-doris-sdk/pkg/load/exception"
+ "github.com/apache/doris/sdk/go-doris-sdk/pkg/load/log"
+ "github.com/apache/doris/sdk/go-doris-sdk/pkg/load/util"
+
+ jsoniter "github.com/json-iterator/go"
+)
+
+// StreamLoader handles loading data into Doris via HTTP stream load
+type StreamLoader struct {
+ httpClient *http.Client
+ json jsoniter.API
+}
+
+// NewStreamLoader creates a new StreamLoader
+func NewStreamLoader() *StreamLoader {
+ return &StreamLoader{
+ httpClient: util.GetHttpClient(),
+ json: jsoniter.ConfigCompatibleWithStandardLibrary,
+ }
+}
+
+// Load sends the HTTP request to Doris via stream load
+func (s *StreamLoader) Load(req *http.Request) (*LoadResponse, error) {
+ // Execute the request - this is the main performance bottleneck
+ log.Debugf("[TIMING] Sending HTTP request...")
+ requestStartTime := time.Now()
+ resp, err := s.httpClient.Do(req)
+ if err != nil {
+ log.Errorf("Failed to execute HTTP request: %v", err)
+ return nil, fmt.Errorf("failed to execute request: %w", err)
+ }
+ defer resp.Body.Close()
+
+ requestDuration := time.Since(requestStartTime)
+ log.Debugf("[TIMING] HTTP request completed: %v", requestDuration)
+
+ // Handle the response
+ result, err := s.handleResponse(resp)
+
+ return result, err
+}
+
+// handleResponse processes the HTTP response from a stream load request
+func (s *StreamLoader) handleResponse(resp *http.Response) (*LoadResponse,
error) {
+ statusCode := resp.StatusCode
+ log.Debugf("Received HTTP response with status code: %d", statusCode)
+
+ if statusCode == http.StatusOK && resp.Body != nil {
+ // Read the response body with limited buffer
+ body, err := io.ReadAll(io.LimitReader(resp.Body, 1024*1024))
// 1MB limit
+ if err != nil {
+ log.Errorf("Failed to read response body: %v", err)
+ return nil, fmt.Errorf("failed to read response body:
%w", err)
+ }
+
+ log.Infof("Stream Load Response: %s", string(body))
+
+ // Parse the response
+ var respContent RespContent
+ if err := s.json.Unmarshal(body, &respContent); err != nil {
+ log.Errorf("Failed to unmarshal JSON response: %v", err)
+ return nil, fmt.Errorf("failed to unmarshal response:
%w", err)
+ }
+
+ // Check status and return result
+ if isSuccessStatus(respContent.Status) {
+ log.Infof("Load operation completed successfully")
+ return &LoadResponse{
+ Status: SUCCESS,
+ Resp: respContent,
+ }, nil
+ } else {
+ log.Errorf("Load operation failed with status: %s",
respContent.Status)
+ errorMessage := ""
+ if respContent.Message != "" {
+ errorMessage = fmt.Sprintf("load failed. cause
by: %s, please check more detail from url: %s",
+ respContent.Message,
respContent.ErrorURL)
+ } else {
+ errorMessage = string(body)
+ }
+
+ return &LoadResponse{
+ Status: FAILURE,
+ Resp: respContent,
+ ErrorMessage: errorMessage,
+ }, nil
+ }
+ }
+
+ // For non-200 status codes, return an error that can be retried
+ log.Errorf("Stream load failed with HTTP status: %s", resp.Status)
+
+ return nil, exception.NewStreamLoadError(fmt.Sprintf("stream load
error: %s", resp.Status))
+}
+
+// isSuccessStatus checks if the status indicates success
+func isSuccessStatus(status string) bool {
+ return strings.EqualFold(status, "success")
+}
diff --git a/sdk/go-doris-sdk/pkg/load/util/http_client.go
b/sdk/go-doris-sdk/pkg/load/util/http_client.go
new file mode 100644
index 00000000000..c1a3802ce2b
--- /dev/null
+++ b/sdk/go-doris-sdk/pkg/load/util/http_client.go
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package util
+
+import (
+ "crypto/tls"
+ "net/http"
+ "sync"
+ "time"
+)
+
+var (
+ client *http.Client
+ once sync.Once
+)
+
+func GetHttpClient() *http.Client {
+ once.Do(func() {
+ client = buildHttpClient()
+ })
+ return client
+}
+
+func buildHttpClient() *http.Client {
+
+ transport := &http.Transport{
+ MaxIdleConnsPerHost: 30, // Maximum idle connections per host
for connection reuse to reduce overhead
+ MaxConnsPerHost: 50, // Maximum total connections (active +
idle) per host, controls concurrency, excess will queue
+ MaxIdleConns: 50, // Global maximum idle connections
+
+ // TLS configuration for Doris HTTP endpoints
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true, // Allow insecure connections
for Doris HTTP endpoints
+ },
+ }
+
+ client := &http.Client{
+ Transport: transport,
+ Timeout: 120 * time.Second, // Total request timeout
+ }
+
+ return client
+}
diff --git a/sdk/go-doris-sdk/pkg/load/util/http_client_test.go
b/sdk/go-doris-sdk/pkg/load/util/http_client_test.go
new file mode 100644
index 00000000000..05f3f3cde6a
--- /dev/null
+++ b/sdk/go-doris-sdk/pkg/load/util/http_client_test.go
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package util
+
+import (
+ "crypto/tls"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "sync"
+ "testing"
+ "time"
+)
+
+// createTestClient creates an HTTP client with custom connection limits for
testing
+func createTestClient(maxIdleConnsPerHost, maxConnsPerHost int) *http.Client {
+ transport := &http.Transport{
+ MaxIdleConnsPerHost: maxIdleConnsPerHost, // Idle connection
pool size, affects connection reuse efficiency
+ MaxConnsPerHost: maxConnsPerHost, // Maximum concurrent
connections, excess will queue
+ MaxIdleConns: 50, // Global idle
connection limit
+ TLSClientConfig: &tls.Config{
+ InsecureSkipVerify: true,
+ },
+ }
+
+ return &http.Client{
+ Transport: transport,
+ Timeout: 30 * time.Second,
+ }
+}
+
+// TestHTTPClientConcurrencyLimits tests the behavior when concurrent requests
exceed connection limits
+func TestHTTPClientConcurrencyLimits(t *testing.T) {
+ // Create a test server that responds after 2 seconds
+ server := httptest.NewServer(http.HandlerFunc(func(w
http.ResponseWriter, r *http.Request) {
+ // Fixed 2-second delay for each request
+ time.Sleep(2 * time.Second)
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte("OK"))
+ }))
+ defer server.Close()
+
+ // Create HTTP client with connection limits
+ // MaxIdleConnsPerHost: 2, MaxConnsPerHost: 3
+ client := createTestClient(2, 3)
+
+ // Test configuration
+ numRequests := 7
+ results := make([]requestResult, numRequests)
+ var wg sync.WaitGroup
+
+ t.Logf("Testing with MaxIdleConnsPerHost=2, MaxConnsPerHost=3")
+ t.Logf("Sending %d concurrent requests to server with 2s response
delay", numRequests)
+ t.Logf("Expected behavior:")
+ t.Logf("- First 3 requests should start immediately (within connection
limit)")
+ t.Logf("- Remaining 4 requests should queue and wait for connections to
be available")
+ t.Logf("- First 3 requests should complete around 2s")
+ t.Logf("- Next batch should complete around 4s")
+ t.Logf("- Final batch should complete around 6s")
+
+ testStartTime := time.Now()
+
+ // Launch concurrent requests
+ for i := 0; i < numRequests; i++ {
+ wg.Add(1)
+ go func(requestID int) {
+ defer wg.Done()
+
+ startTime := time.Now()
+ resp, err := client.Get(server.URL)
+ endTime := time.Now()
+
+ duration := endTime.Sub(startTime)
+ relativeStartTime := startTime.Sub(testStartTime)
+ relativeEndTime := endTime.Sub(testStartTime)
+
+ results[requestID] = requestResult{
+ ID: requestID + 1,
+ StartTime: relativeStartTime,
+ EndTime: relativeEndTime,
+ Duration: duration,
+ Success: err == nil && resp != nil,
+ Error: err,
+ }
+
+ if resp != nil {
+ resp.Body.Close()
+ }
+ }(i)
+ }
+
+ // Wait for all requests to complete
+ wg.Wait()
+
+ // Analyze and display results
+ t.Logf("\n=== Test Results ===")
+ t.Logf("Request | Start Time | End Time | Duration | Success")
+ t.Logf("--------|------------|------------|------------|--------")
+
+ for _, result := range results {
+ status := "OK"
+ if !result.Success {
+ status = fmt.Sprintf("FAIL: %v", result.Error)
+ }
+
+ t.Logf(" %d | %8s | %8s | %8s | %s",
+ result.ID,
+ formatDuration(result.StartTime),
+ formatDuration(result.EndTime),
+ formatDuration(result.Duration),
+ status)
+ }
+
+ // Analyze connection behavior
+ analyzeConnectionBehavior(t, results)
+}
+
+type requestResult struct {
+ ID int
+ StartTime time.Duration // relative to test start
+ EndTime time.Duration // relative to test start
+ Duration time.Duration // request duration
+ Success bool
+ Error error
+}
+
+func formatDuration(d time.Duration) string {
+ return fmt.Sprintf("%.2fs", d.Seconds())
+}
+
+func analyzeConnectionBehavior(t *testing.T, results []requestResult) {
+ t.Logf("\n=== Connection Behavior Analysis ===")
+
+ // Group requests by completion time ranges
+ fastRequests := 0 // Completed within 2.5s (likely immediate
connections)
+ slowRequests := 0 // Completed after 3.5s (likely queued)
+
+ for _, result := range results {
+ if result.Success {
+ if result.Duration < 2500*time.Millisecond {
+ fastRequests++
+ } else if result.Duration > 3500*time.Millisecond {
+ slowRequests++
+ }
+ }
+ }
+
+ t.Logf("Fast requests (< 2.5s): %d (expected: 3 - within connection
limit)", fastRequests)
+ t.Logf("Slow requests (> 3.5s): %d (expected: 4 - queued requests)",
slowRequests)
+
+ // Verify expectations
+ if fastRequests == 3 && slowRequests == 4 {
+ t.Logf("โ
Connection pool behavior matches expectations!")
+ t.Logf(" - MaxConnsPerHost=3 allowed 3 requests to proceed
immediately")
+ t.Logf(" - Remaining 4 requests were queued and waited for
connections")
+ } else {
+ t.Logf("โ Unexpected connection pool behavior")
+ t.Logf(" - Expected 3 fast requests and 4 slow requests")
+ t.Logf(" - Got %d fast requests and %d slow requests",
fastRequests, slowRequests)
+ }
+
+ // Additional insights
+ maxDuration := time.Duration(0)
+ minDuration := time.Duration(1<<63 - 1) // max duration
+
+ for _, result := range results {
+ if result.Success {
+ if result.Duration > maxDuration {
+ maxDuration = result.Duration
+ }
+ if result.Duration < minDuration {
+ minDuration = result.Duration
+ }
+ }
+ }
+
+ t.Logf("\nDuration range: %s - %s", formatDuration(minDuration),
formatDuration(maxDuration))
+
+ if maxDuration > minDuration+1500*time.Millisecond {
+ t.Logf("โ
Significant duration difference indicates connection
queuing is working")
+ } else {
+ t.Logf("โ ๏ธ Small duration difference - connection limits might
not be effective")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]