This is an automated email from the ASF dual-hosted git repository.

airborne pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c70f0875572 [enhancement](plugin) logstash: support multi-table 
(#48040)
c70f0875572 is described below

commit c70f08755727a1542988e921cc457c6d8b73ca45
Author: Mingxi <[email protected]>
AuthorDate: Wed May 14 11:44:23 2025 +0800

    [enhancement](plugin) logstash: support multi-table (#48040)
    
    Problem Summary:
    
    Support doris logstash output plugin to write multiple tables.
    
    Use apache async http client (based on nio) to replace ruby rest-client
    
    **Important**: Because the jar dependencies are added and are already
    included in the gem package, we need to first type `export
    JARS_SKIP="true"` before installing it to logstash to prevent the jars
    from being looked up from the local maven repository.
---
 extension/logstash/.gitignore                      |   8 +
 extension/logstash/lib/logstash/outputs/doris.rb   | 269 +++++++++++++++------
 .../logstash/lib/logstash/util/delay_event.rb      |  18 +-
 extension/logstash/logstash-output-doris.gemspec   |   8 +-
 4 files changed, 208 insertions(+), 95 deletions(-)

diff --git a/extension/logstash/.gitignore b/extension/logstash/.gitignore
new file mode 100644
index 00000000000..8a60245fb73
--- /dev/null
+++ b/extension/logstash/.gitignore
@@ -0,0 +1,8 @@
+/lib/com
+/lib/org
+/lib/logstash-output-doris_jars.rb
+
+Gemfile.lock
+
+logstash-output-doris-*.gem
+logstash-output-doris-*.zip
diff --git a/extension/logstash/lib/logstash/outputs/doris.rb 
b/extension/logstash/lib/logstash/outputs/doris.rb
index 971d28889c4..86922036c58 100644
--- a/extension/logstash/lib/logstash/outputs/doris.rb
+++ b/extension/logstash/lib/logstash/outputs/doris.rb
@@ -27,11 +27,16 @@ require "uri"
 require "securerandom"
 require "json"
 require "base64"
-require "restclient"
 require 'thread'
 
+require 'java'
+require "#{File.dirname(__FILE__)}/../../logstash-output-doris_jars.rb"
 
 class LogStash::Outputs::Doris < LogStash::Outputs::Base
+   include_package 'org.apache.hc.client5.http.impl.async'
+   include_package 'org.apache.hc.client5.http.async.methods'
+   include_package 'org.apache.hc.core5.http'
+
    # support multi thread concurrency for performance
    # so multi_receive() and function it calls are all stateless and thread safe
    concurrency :shared
@@ -44,6 +49,8 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
    config :db, :validate => :string, :required => true
    # the table which data is loaded to
    config :table, :validate => :string, :required => true
+   # default table
+   config :default_table, :validate => :string, :default => ""
    # label prefix of a stream load request.
    config :label_prefix, :validate => :string, :default => "logstash"
    # user name
@@ -89,8 +96,21 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
       :http_hosts => @http_hosts)
    end
 
+   class DorisRedirectStrategy < 
Java::org.apache.hc.client5.http.impl.DefaultRedirectStrategy
+      def getLocationURI(request, response, context)
+         uri = super(request, response, context)
+         # remove user info in redirect uri
+         java.net.URI.new(uri.getScheme, nil, uri.getHost, uri.getPort, 
uri.getPath, uri.getQuery, uri.getFragment)
+      end
+   end
+
+   def http_query(table)
+      "/api/#{@db}/#{table}/_stream_load"
+   end
+
    def register
-      @http_query = "/api/#{@db}/#{@table}/_stream_load"
+      @client = 
HttpAsyncClients.custom.setRedirectStrategy(DorisRedirectStrategy.new).build
+      @client.start
 
       @request_headers = make_request_headers
       @logger.info("request headers: ", @request_headers)
@@ -143,18 +163,24 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
       # retry queue size in bytes
       @retry_queue_bytes = java.util.concurrent.atomic.AtomicLong.new(0)
       retry_thread = Thread.new do
-         while popped = @retry_queue.take
-            documents, http_headers, event_num, req_count = popped.event
-            handle_request(documents, http_headers, event_num, req_count)
+         while (popped = @retry_queue.take)
+            table_events_map = popped.event
+            handle_request(table_events_map)
          end
       end
 
-      print_plugin_info()
+      @const_table = @table.index("%").nil?
+
+      print_plugin_info
    end # def register
 
    private
    def add_event_to_retry_queue(delay_event)
-      event_size = delay_event.documents.size
+      event_size = 0
+      delay_event.event.each do |_, table_events|
+         event_size += table_events.documents.size
+      end
+
       if delay_event.first_retry
          while @retry_queue_bytes.get + event_size > @max_retry_queue_mb * 
1024 * 1024
             sleep(1)
@@ -169,21 +195,47 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
       send_events(events)
    end
 
-   private
-   def send_events(events)
-      documents = events.map { |event| event_body(event) }.join("\n")
-      event_num = events.size
-
-      # @logger.info("get event num: #{event_num}")
-      @logger.debug("get documents: #{documents}")
-
+   def create_http_headers(table)
       http_headers = @request_headers.dup
       if !@group_commit
          # only set label if group_commit is off_mode or not set, since lable 
can not be used with group_commit
-         http_headers["label"] = @label_prefix + "_" + @db + "_" + @table + 
"_" + Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid)
+         http_headers["label"] = @label_prefix + "_" + @db + "_" + table + "_" 
+ Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid)
+      end
+      http_headers
+   end
+
+   private
+   def send_events(events)
+      table_events_map = Hash.new
+      if @const_table
+         table_events = TableEvents.new(@table, create_http_headers(@table))
+         table_events.events = events
+         table_events_map[@table] = table_events
+      else
+         events.each do |event|
+            table = event.sprintf(@table)
+            if table == "" || !table.index("%").nil?
+               table = @default_table
+               if table == ""
+                  @logger.warn("table format error, the default table is not 
set, the data will be dropped")
+               else
+                  @logger.warn("table format error, use the default table: 
#{table}")
+               end
+            end
+            table_events = table_events_map[table]
+            if table_events == nil
+               table_events = TableEvents.new(table, 
create_http_headers(table))
+               table_events_map[table] = table_events
+            end
+            table_events.events << event
+         end
       end
 
-      handle_request(documents, http_headers, event_num, 1)
+      table_events_map.each do |_, table_events|
+         serialize(table_events)
+      end
+
+      handle_request(table_events_map)
    end
 
    def sleep_for_attempt(attempt)
@@ -192,82 +244,111 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
       (sleep_for/2) + (rand(0..sleep_for)/2)
    end
 
+   STAT_SUCCESS = 0
+   STAT_FAIL = 1
+   STAT_RETRY = 2
+
    private
-   def handle_request(documents, http_headers, event_num, req_count)
-      response = make_request(documents, http_headers, @http_query, 
@http_hosts.sample)
-      response_json = {}
-      begin
-         response_json = JSON.parse(response.body)
-      rescue => _
-         @logger.warn("doris stream load response is not a valid 
JSON:\n#{response}")
-      end
+   def handle_request(table_events_map)
+      make_request(table_events_map)
+      retry_map = Hash.new
+      table_events_map.each do |table, table_events|
+         stat = STAT_SUCCESS
+
+         if table == ""
+            @logger.warn("drop #{table_events.events_count} records because of 
empty table")
+            stat = STAT_FAIL
+         end
 
-      status = response_json["Status"]
+         response = ""
+         if stat == STAT_SUCCESS
+            begin
+               response = table_events.response_future.get.getBodyText
+            rescue => e
+               log_failure("doris stream load request error: #{e}")
+               stat = STAT_RETRY
+            end
+         end
 
-      need_retry = true
+         response_json = {}
+         if stat == STAT_SUCCESS
+            begin
+               response_json = JSON.parse(response)
+            rescue => _
+               @logger.warn("doris stream load response is not a valid 
JSON:\n#{response}")
+               stat = STAT_RETRY
+            end
+         end
 
-      if status == 'Label Already Exists'
-         @logger.warn("Label already exists: #{response_json['Label']}, skip 
#{event_num} records:\n#{response}")
-         need_retry = false
+         if stat == STAT_SUCCESS
+            status = response_json["Status"]
+
+            if status == 'Label Already Exists'
+               @logger.warn("Label already exists: #{response_json['Label']}, 
skip #{table_events.events_count} records:\n#{response}")
+
+            elsif status == "Success" || status == "Publish Timeout"
+               @total_bytes.addAndGet(table_events.documents.size)
+               @total_rows.addAndGet(table_events.events_count)
+               if @log_request or @logger.debug?
+                  @logger.info("doris stream load response:\n#{response}")
+               end
+
+            else
+               @logger.warn("FAILED doris stream load response:\n#{response}")
+               if @max_retries >= 0 && table_events.req_count - 1 >= 
@max_retries
+                  @logger.warn("DROP this batch after failed 
#{table_events.req_count} times.")
+                  stat = STAT_FAIL
+               else
+                  stat = STAT_RETRY
+               end
+            end
+         end
 
-      elsif status == "Success" || status == "Publish Timeout"
-         @total_bytes.addAndGet(documents.size)
-         @total_rows.addAndGet(event_num)
-         if @log_request or @logger.debug?
-            @logger.info("doris stream load response:\n#{response}")
+         if stat == STAT_FAIL && @save_on_failure
+            @logger.warn("Try save to disk.Disk file path : 
#{@save_dir}/#{table}_#{@save_file}")
+            save_to_disk(table_events.documents, table)
          end
-         need_retry = false
-
-      elsif @max_retries >= 0 && req_count - 1 > @max_retries
-         @logger.warn("FAILED doris stream load response:\n#{response}")
-         @logger.warn("DROP this batch after failed #{req_count} times.")
-         if @save_on_failure
-            @logger.warn("Try save to disk.Disk file path : 
#{@save_dir}/#{@table}_#{@save_file}")
-            save_to_disk(documents)
+
+         if stat != STAT_RETRY && table_events.req_count > 1
+            @retry_queue_bytes.addAndGet(-table_events.documents.size)
          end
-         need_retry = false
-      end
 
-      if !need_retry
-         if req_count > 1
-            @retry_queue_bytes.addAndGet(-documents.size)
+         if stat == STAT_RETRY
+            table_events.prepare_retry
+            retry_map[table] = table_events
          end
-         return
       end
 
-      # add to retry_queue
-      sleep_for = sleep_for_attempt(req_count)
-      @logger.warn("FAILED doris stream load response:\n#{response}")
-      @logger.warn("Will do the #{req_count}th retry after #{sleep_for} secs.")
-      delay_event = DelayEvent.new(sleep_for, [documents, http_headers, 
event_num, req_count+1])
-      add_event_to_retry_queue(delay_event)
+      if retry_map.size > 0
+         # add to retry_queue
+         req_count = retry_map.values[0].req_count
+         sleep_for = sleep_for_attempt(req_count)
+         @logger.warn("Will do the #{req_count-1}th retry after #{sleep_for} 
secs.")
+         delay_event = DelayEvent.new(sleep_for, retry_map)
+         add_event_to_retry_queue(delay_event)
+      end
    end
 
    private
-   def make_request(documents, http_headers, query, host)
-      url = host + query
+   def make_request(table_events_map)
+      table_events_map.each do |table, table_events|
+         url = @http_hosts.sample + http_query(table)
 
-      if @log_request or @logger.debug?
-         @logger.info("doris stream load request url: #{url}  headers: 
#{http_headers}  body size: #{documents.size}")
-      end
-      @logger.debug("doris stream load request body: #{documents}")
+         if @log_request or @logger.debug?
+            @logger.info("doris stream load request url: #{url}  headers: 
#{table_events.http_headers}  body size: #{table_events.documents.size}")
+         end
+         @logger.debug("doris stream load request body: 
#{table_events.documents}")
+
+         request = SimpleRequestBuilder.
+            put(url).
+            setBody(table_events.documents, ContentType::TEXT_PLAIN).
+            build
+         table_events.http_headers.each do |k, v|
+            request.addHeader(k, v)
+         end
 
-      response = ""
-      begin
-         response = RestClient.put(url, documents, http_headers) { |res, 
request, result|
-                case res.code
-                when 301, 302, 307
-                    @logger.debug("redirect to: #{res.headers[:location]}")
-                    res.follow_redirection
-                else
-                  res.return!
-                end
-         }
-      rescue => e
-         log_failure("doris stream load request error: #{e}")
+         table_events.response_future = @client.execute(request, nil)
       end
-
-      response
    end # def make_request
 
    # Format the HTTP body
@@ -307,10 +388,10 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
    end
 
    private
-   def save_to_disk(documents)
+   def save_to_disk(documents, table)
       begin
-         file = File.open("#{@save_dir}/#{@db}_#{@table}_#{@save_file}", "a")
-         file.write(documents)
+         file = File.open("#{@save_dir}/#{@db}_#{table}_#{@save_file}", "a")
+         file.write(documents, "\n")
       rescue IOError => e
          log_failure("An error occurred while saving file to disk: #{e}",
          :file_name => file_name)
@@ -332,4 +413,34 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
   
       headers
    end
+
+   def serialize(table_events)
+      table_events.events_count = table_events.events.size
+      table_events.documents = table_events.events.map { |e| event_body(e) 
}.join("\n")
+      table_events.events = nil # no longer used, can be gc
+
+      @logger.debug("get documents: #{table_events.documents}")
+   end
+
+   class TableEvents
+      attr_accessor :table, :http_headers, :events, :events_count, :documents, 
:req_count, :response_future
+
+      def initialize(table, http_headers)
+         @table = table
+         @http_headers = http_headers
+
+         @events = []
+         @events_count = 0
+         @documents = ""
+         @req_count = 1
+
+         @response_future = nil
+      end
+
+      def prepare_retry
+         @req_count += 1
+         @response_future = nil
+      end
+   end
+
 end # end of class LogStash::Outputs::Doris
diff --git a/extension/logstash/lib/logstash/util/delay_event.rb 
b/extension/logstash/lib/logstash/util/delay_event.rb
index 86f59ef457f..04c23a76621 100644
--- a/extension/logstash/lib/logstash/util/delay_event.rb
+++ b/extension/logstash/lib/logstash/util/delay_event.rb
@@ -20,9 +20,11 @@ require 'java'
 class DelayEvent
    include java.util.concurrent.Delayed
 
+   attr_accessor :start_time, :event
+
    def initialize(delay, event)
       @start_time = Time.now.to_i + delay
-      @event = event # event style: [documents, http_headers, event_num, 
req_count]
+      @event = event # Hash[table, TableEvents]
    end
 
    def get_delay(unit)
@@ -36,19 +38,7 @@ class DelayEvent
       d < 0 ? -1 : 1
    end
 
-   def start_time
-      @start_time
-   end
-
-   def event
-      @event
-   end
-
-   def documents
-      @event[0]
-   end
-
    def first_retry
-      @event[3] == 2
+      @event.values[0].req_count == 2
    end
 end
diff --git a/extension/logstash/logstash-output-doris.gemspec 
b/extension/logstash/logstash-output-doris.gemspec
index f44d57d0511..ba94518d279 100644
--- a/extension/logstash/logstash-output-doris.gemspec
+++ b/extension/logstash/logstash-output-doris.gemspec
@@ -18,7 +18,7 @@ under the License.
 =end
 Gem::Specification.new do |s|
   s.name            = 'logstash-output-doris'
-  s.version         = '1.1.0'
+  s.version         = '1.2.0'
   s.author          = 'Apache Doris'
   s.email           = '[email protected]'
   s.homepage        = 'http://doris.apache.org'
@@ -27,6 +27,8 @@ Gem::Specification.new do |s|
   s.description     = "This gem is a logstash plugin required to be installed 
on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. 
This gem is not a stand-alone program"
   s.require_paths = ["lib"]
 
+  s.platform = 'java'
+
   # Files
   s.files = Dir['lib/**/*','spec/**/*','*.gemspec','*.md','Gemfile','LICENSE' ]
 
@@ -38,9 +40,11 @@ Gem::Specification.new do |s|
 
   # Gem dependencies
   s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
-  s.add_runtime_dependency "rest-client", '~> 2.1'
+  s.add_runtime_dependency 'jar-dependencies', ">= 0.0.2" # depends on 
logstash version
 
   s.add_development_dependency 'logstash-devutils', '~> 1.3'
   s.add_development_dependency 'sinatra', '~> 1.4'
   s.add_development_dependency 'webrick', '~> 1.9'
+
+  s.requirements << 'jar org.apache.httpcomponents.client5, httpclient5, 5.4.2'
 end


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to