This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 62dc78fbb00 [fix](logstash) Replace HttpClient5 async with HttpClient4 
sync to fix CircularRedirectException (1.2.0 -> 1.2.1) (#63181)
62dc78fbb00 is described below

commit 62dc78fbb00b1e8617f8a379ca9dd5dc250a32c1
Author: bingquanzhao <[email protected]>
AuthorDate: Mon Jun 8 12:07:25 2026 +0800

    [fix](logstash) Replace HttpClient5 async with HttpClient4 sync to fix 
CircularRedirectException (1.2.0 -> 1.2.1) (#63181)
    
    Replace HttpClient5 async with HttpClient4 sync to fix
    CircularRedirectException (1.2.0 -> 1.2.1)
    
    The logstash-output-doris plugin uses Apache HttpClient5 async client to
    PUT stream load requests. Against SelectDB Cloud / BYOC FE — which
    returns '307 + Connection: close' on stream load — the async client
    fails with CircularRedirectException under any meaningful concurrency /
    body size.
    
    Root cause:
    1. HC5 async does not strictly block body transmission while waiting for
    '100 Continue'. When FE returns 307 before issuing 100, the entity
    producer has already started writing; FE closing the connection then
    yields an IOException mid-transfer.
    2. HC5 default exec chain wraps RedirectExec around
    AsyncHttpRequestRetryExec. The recoverable IOException triggers an
    internal retry that re-enters the same FE -> 307 path, but
    RedirectLocations from the first attempt is still populated, so the same
    BE URL is detected as 'already visited' and reported as a circular
    redirect.
    
    This is a real HC5-vs-HC4 implementation difference, not a configuration
    issue. The Doris Flink connector also follows FE 307 to BE in its
    default path (autoRedirect=true) and works correctly precisely because
    it uses HC4 sync: HC4 honors 'Expect: 100-continue' strictly, so when FE
    307s without sending 100, the entity is left unconsumed and HC4's
    RedirectExec follows the redirect normally.
    
    This patch aligns the plugin with the Flink connector's HTTP layer:
      - bump gem version 1.2.0 -> 1.2.1
      - httpclient5 5.4.2 (async)  ->  httpclient 4.5.13 (sync)
    - SimpleRequestBuilder -> HttpPut + ByteArrayEntity (repeatable)
      - HttpAsyncClients defaults  ->  HttpClients with:
          * setRequestExecutor(HttpRequestExecutor(60s))
          * setRedirectStrategy(DorisRedirectStrategy) (isRedirectable=true,
            strip userinfo, normalize empty query)
          * setRetryHandler(DefaultHttpRequestRetryHandler(0, false))
          * setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE)
          * RequestConfig.setExpectContinueEnabled(true)
      - Async future plumbing in TableEvents replaced with sync
        response_code / response_body / response_error fields.
      - Stringify both key and value at request.addHeader call site: HC4's
        addHeader(String, String) is strict on types whereas HC5 had a
        permissive (String, Object) overload; user configs commonly carry
        Float / Integer values like 'max_filter_ratio => 1.0'.
    - Drop 's.requirements << jar ...' from gemspec: with JARs vendored
    under
    lib/, the maven lookup at install time is unnecessary and forced users
        to set JARS_SKIP=true for offline installs.
    
    Pipeline configuration, retry queue, save_on_failure, group_commit,
    label generation, header handling - all unchanged.
    
    Verified on a SelectDB BYOC cluster mirroring the reported production
    shape (16 workers x 10000 batch x 200,000 events):
      - Before: 100% requests fail with CircularRedirectException
    - After: 20/20 stream loads Status=Success, 200,000/200,000 rows
    ingested, 0 HTTP-layer errors.
---
 extension/logstash/README.md                     | 50 +++++++++++--
 extension/logstash/lib/logstash/outputs/doris.rb | 89 ++++++++++++++++++------
 extension/logstash/logstash-output-doris.gemspec |  4 +-
 3 files changed, 116 insertions(+), 27 deletions(-)

diff --git a/extension/logstash/README.md b/extension/logstash/README.md
index 34feb45db26..c29d4c1ca40 100644
--- a/extension/logstash/README.md
+++ b/extension/logstash/README.md
@@ -17,12 +17,52 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-1. How to build
+# logstash-output-doris
 
-       `gem build logstash-output-doris.gemspec`
+A Logstash output plugin that ships events to Doris via stream load.
 
-2. How to use
+Docs: <https://doris.apache.org/docs/dev/ecosystem/logstash>
 
-   `https://doris.apache.org/zh-CN/docs/dev/ecosystem/logstash`
-   `https://doris.apache.org/docs/dev/ecosystem/logstash`
+## Build
 
+Prerequisites: JRuby (>= 9.4 with Java 21, or 9.2 with Java 8/11) and the
+`jar-dependencies` gem (`jruby -S gem install jar-dependencies`).
+
+```bash
+# 1. Vendor HttpClient4 + transitive jars into lib/ and generate the loader.
+#    Reads the 'jar' entry from logstash-output-doris.gemspec.
+jruby -e "require 'jars/installer'; Jars::Installer.new.vendor_jars"
+
+# 2. Build the gem.
+jruby -S gem build logstash-output-doris.gemspec
+```
+
+Produces `logstash-output-doris-<version>-java.gem`.
+
+## Install
+
+The jars are already vendored inside the gem, so the install hook does not
+need to talk to Maven — pass `JARS_SKIP=true` to skip the lookup:
+
+```bash
+JARS_SKIP=true $LS_HOME/bin/logstash-plugin install \
+    logstash-output-doris-<version>-java.gem
+```
+
+### Air-gapped install (offline pack)
+
+For a Logstash host without internet access, build an offline pack on a
+connected host first:
+
+```bash
+# On a connected host (same Logstash major version as the target):
+JARS_SKIP=true $LS_HOME/bin/logstash-plugin install \
+    logstash-output-doris-<version>-java.gem
+$LS_HOME/bin/logstash-plugin prepare-offline-pack \
+    --output logstash-output-doris-<version>-offline-pack.zip \
+    logstash-output-doris
+
+# Then on the air-gapped target:
+$LS_HOME/bin/logstash-plugin install \
+    file:///path/to/logstash-output-doris-<version>-offline-pack.zip
+```
diff --git a/extension/logstash/lib/logstash/outputs/doris.rb 
b/extension/logstash/lib/logstash/outputs/doris.rb
index 86922036c58..b1e28eb4ae2 100644
--- a/extension/logstash/lib/logstash/outputs/doris.rb
+++ b/extension/logstash/lib/logstash/outputs/doris.rb
@@ -33,9 +33,14 @@ require 'java'
 require "#{File.dirname(__FILE__)}/../../logstash-output-doris_jars.rb"
 
 class LogStash::Outputs::Doris < LogStash::Outputs::Base
-   include_package 'org.apache.hc.client5.http.impl.async'
-   include_package 'org.apache.hc.client5.http.async.methods'
-   include_package 'org.apache.hc.core5.http'
+   java_import 'org.apache.http.client.methods.HttpPut'
+   java_import 'org.apache.http.entity.ByteArrayEntity'
+   java_import 'org.apache.http.util.EntityUtils'
+   java_import 'org.apache.http.impl.client.HttpClients'
+   java_import 'org.apache.http.impl.client.DefaultHttpRequestRetryHandler'
+   java_import 'org.apache.http.impl.NoConnectionReuseStrategy'
+   java_import 'org.apache.http.protocol.HttpRequestExecutor'
+   java_import 'org.apache.http.client.config.RequestConfig'
 
    # support multi thread concurrency for performance
    # so multi_receive() and function it calls are all stateless and thread safe
@@ -96,11 +101,19 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
       :http_hosts => @http_hosts)
    end
 
-   class DorisRedirectStrategy < 
Java::org.apache.hc.client5.http.impl.DefaultRedirectStrategy
+   class DorisRedirectStrategy < 
Java::org.apache.http.impl.client.DefaultRedirectStrategy
+      # allow redirect for all methods (PUT/POST) - HC4 only allows GET/HEAD 
by default
+      def isRedirectable(method)
+         true
+      end
+
       def getLocationURI(request, response, context)
          uri = super(request, response, context)
          # remove user info in redirect uri
-         java.net.URI.new(uri.getScheme, nil, uri.getHost, uri.getPort, 
uri.getPath, uri.getQuery, uri.getFragment)
+         # normalize empty query string ("?" with nothing after) to no query 
at all
+         query = uri.getQuery
+         query = nil if query == ""
+         java.net.URI.new(uri.getScheme, nil, uri.getHost, uri.getPort, 
uri.getPath, query, uri.getFragment)
       end
    end
 
@@ -109,8 +122,21 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
    end
 
    def register
-      @client = 
HttpAsyncClients.custom.setRedirectStrategy(DorisRedirectStrategy.new).build
-      @client.start
+      # HttpClient 4.5.13 sync — same setup as Doris Flink connector 
(HttpUtil.java)
+      # Key points:
+      #   - setRequestExecutor(60s)        : long wait for 100-continue, FE 
may delay 307 under load
+      #   - setRedirectStrategy            : follow 307 on PUT (default 
DefaultRedirectStrategy refuses)
+      #   - DefaultHttpRequestRetryHandler(0, false) : NO retries -> avoid 
spurious CircularRedirect
+      #   - NoConnectionReuseStrategy      : one connection per request, dodge 
keep-alive half-close
+      #   - setExpectContinueEnabled(true) : critical -> HC4 waits for 100, FE 
307s before body is sent,
+      #                                      entity stays unconsumed, 
RedirectExec follows successfully
+      @client = HttpClients.custom
+         .setRequestExecutor(HttpRequestExecutor.new(60_000))
+         .setRedirectStrategy(DorisRedirectStrategy.new)
+         .setRetryHandler(DefaultHttpRequestRetryHandler.new(0, false))
+         .setConnectionReuseStrategy(NoConnectionReuseStrategy::INSTANCE)
+         
.setDefaultRequestConfig(RequestConfig.custom.setExpectContinueEnabled(true).build)
+         .build
 
       @request_headers = make_request_headers
       @logger.info("request headers: ", @request_headers)
@@ -262,11 +288,11 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
 
          response = ""
          if stat == STAT_SUCCESS
-            begin
-               response = table_events.response_future.get.getBodyText
-            rescue => e
-               log_failure("doris stream load request error: #{e}")
+            if table_events.response_error
+               log_failure("doris stream load request error: 
#{table_events.response_error}")
                stat = STAT_RETRY
+            else
+               response = table_events.response_body
             end
          end
 
@@ -339,15 +365,33 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
          end
          @logger.debug("doris stream load request body: 
#{table_events.documents}")
 
-         request = SimpleRequestBuilder.
-            put(url).
-            setBody(table_events.documents, ContentType::TEXT_PLAIN).
-            build
+         request = HttpPut.new(url)
+         # ByteArrayEntity: known content-length, repeatable -> safe across 
307 retries.
+         # Combined with Expect:100-continue at the client config level, body 
is held back
+         # until FE decides; FE 307 short-circuits without consuming the 
entity.
+         
request.setEntity(ByteArrayEntity.new(table_events.documents.to_java_bytes))
+         # HC4's addHeader(String, String) is strict on types — users commonly 
write
+         # non-string values like `max_filter_ratio => 1.0` (Float) or 
`timeout => 1200`
+         # (Integer) in their pipeline config. HC5 tolerated this via an 
(String, Object)
+         # overload; HC4 does not. Stringify both sides to keep that 
ergonomics.
          table_events.http_headers.each do |k, v|
-            request.addHeader(k, v)
+            request.addHeader(k.to_s, v.to_s)
          end
 
-         table_events.response_future = @client.execute(request, nil)
+         # Sync execute. Capture body + status into table_events so 
handle_request stays
+         # mostly unchanged. Exceptions are stashed for the same flow to 
interpret as RETRY.
+         begin
+            response = @client.execute(request)
+            begin
+               table_events.response_code = 
response.getStatusLine.getStatusCode
+               entity = response.getEntity
+               table_events.response_body = entity.nil? ? "" : 
EntityUtils.toString(entity)
+            ensure
+               response.close
+            end
+         rescue => e
+            table_events.response_error = e
+         end
       end
    end # def make_request
 
@@ -423,7 +467,8 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
    end
 
    class TableEvents
-      attr_accessor :table, :http_headers, :events, :events_count, :documents, 
:req_count, :response_future
+      attr_accessor :table, :http_headers, :events, :events_count, :documents, 
:req_count,
+                    :response_code, :response_body, :response_error
 
       def initialize(table, http_headers)
          @table = table
@@ -434,12 +479,16 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
          @documents = ""
          @req_count = 1
 
-         @response_future = nil
+         @response_code = nil
+         @response_body = nil
+         @response_error = nil
       end
 
       def prepare_retry
          @req_count += 1
-         @response_future = nil
+         @response_code = nil
+         @response_body = nil
+         @response_error = nil
       end
    end
 
diff --git a/extension/logstash/logstash-output-doris.gemspec 
b/extension/logstash/logstash-output-doris.gemspec
index ba94518d279..daa23cdb39f 100644
--- a/extension/logstash/logstash-output-doris.gemspec
+++ b/extension/logstash/logstash-output-doris.gemspec
@@ -18,7 +18,7 @@ under the License.
 =end
 Gem::Specification.new do |s|
   s.name            = 'logstash-output-doris'
-  s.version         = '1.2.0'
+  s.version         = '1.2.1'
   s.author          = 'Apache Doris'
   s.email           = '[email protected]'
   s.homepage        = 'http://doris.apache.org'
@@ -46,5 +46,5 @@ Gem::Specification.new do |s|
   s.add_development_dependency 'sinatra', '~> 1.4'
   s.add_development_dependency 'webrick', '~> 1.9'
 
-  s.requirements << 'jar org.apache.httpcomponents.client5, httpclient5, 5.4.2'
+  s.requirements << 'jar org.apache.httpcomponents, httpclient, 4.5.13'
 end


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to