This is an automated email from the ASF dual-hosted git repository.
airborne pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c70f0875572 [enhancement](plugin) logstash: support multi-table
(#48040)
c70f0875572 is described below
commit c70f08755727a1542988e921cc457c6d8b73ca45
Author: Mingxi <[email protected]>
AuthorDate: Wed May 14 11:44:23 2025 +0800
[enhancement](plugin) logstash: support multi-table (#48040)
Problem Summary:
Support doris logstash output plugin to write multiple tables.
Use apache async http client (based on nio) to replace ruby rest-client
**Important**: Because the jar dependencies are added and are already
included in the gem package, we need to first type `export
JARS_SKIP="true"` before installing it to logstash to prevent the jars
from being looked up from the local maven repository.
---
extension/logstash/.gitignore | 8 +
extension/logstash/lib/logstash/outputs/doris.rb | 269 +++++++++++++++------
.../logstash/lib/logstash/util/delay_event.rb | 18 +-
extension/logstash/logstash-output-doris.gemspec | 8 +-
4 files changed, 208 insertions(+), 95 deletions(-)
diff --git a/extension/logstash/.gitignore b/extension/logstash/.gitignore
new file mode 100644
index 00000000000..8a60245fb73
--- /dev/null
+++ b/extension/logstash/.gitignore
@@ -0,0 +1,8 @@
+/lib/com
+/lib/org
+/lib/logstash-output-doris_jars.rb
+
+Gemfile.lock
+
+logstash-output-doris-*.gem
+logstash-output-doris-*.zip
diff --git a/extension/logstash/lib/logstash/outputs/doris.rb
b/extension/logstash/lib/logstash/outputs/doris.rb
index 971d28889c4..86922036c58 100644
--- a/extension/logstash/lib/logstash/outputs/doris.rb
+++ b/extension/logstash/lib/logstash/outputs/doris.rb
@@ -27,11 +27,16 @@ require "uri"
require "securerandom"
require "json"
require "base64"
-require "restclient"
require 'thread'
+require 'java'
+require "#{File.dirname(__FILE__)}/../../logstash-output-doris_jars.rb"
class LogStash::Outputs::Doris < LogStash::Outputs::Base
+ include_package 'org.apache.hc.client5.http.impl.async'
+ include_package 'org.apache.hc.client5.http.async.methods'
+ include_package 'org.apache.hc.core5.http'
+
# support multi thread concurrency for performance
# so multi_receive() and function it calls are all stateless and thread safe
concurrency :shared
@@ -44,6 +49,8 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
config :db, :validate => :string, :required => true
# the table which data is loaded to
config :table, :validate => :string, :required => true
+ # default table
+ config :default_table, :validate => :string, :default => ""
# label prefix of a stream load request.
config :label_prefix, :validate => :string, :default => "logstash"
# user name
@@ -89,8 +96,21 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
:http_hosts => @http_hosts)
end
+ class DorisRedirectStrategy <
Java::org.apache.hc.client5.http.impl.DefaultRedirectStrategy
+ def getLocationURI(request, response, context)
+ uri = super(request, response, context)
+ # remove user info in redirect uri
+ java.net.URI.new(uri.getScheme, nil, uri.getHost, uri.getPort,
uri.getPath, uri.getQuery, uri.getFragment)
+ end
+ end
+
+ def http_query(table)
+ "/api/#{@db}/#{table}/_stream_load"
+ end
+
def register
- @http_query = "/api/#{@db}/#{@table}/_stream_load"
+ @client =
HttpAsyncClients.custom.setRedirectStrategy(DorisRedirectStrategy.new).build
+ @client.start
@request_headers = make_request_headers
@logger.info("request headers: ", @request_headers)
@@ -143,18 +163,24 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
# retry queue size in bytes
@retry_queue_bytes = java.util.concurrent.atomic.AtomicLong.new(0)
retry_thread = Thread.new do
- while popped = @retry_queue.take
- documents, http_headers, event_num, req_count = popped.event
- handle_request(documents, http_headers, event_num, req_count)
+ while (popped = @retry_queue.take)
+ table_events_map = popped.event
+ handle_request(table_events_map)
end
end
- print_plugin_info()
+ @const_table = @table.index("%").nil?
+
+ print_plugin_info
end # def register
private
def add_event_to_retry_queue(delay_event)
- event_size = delay_event.documents.size
+ event_size = 0
+ delay_event.event.each do |_, table_events|
+ event_size += table_events.documents.size
+ end
+
if delay_event.first_retry
while @retry_queue_bytes.get + event_size > @max_retry_queue_mb *
1024 * 1024
sleep(1)
@@ -169,21 +195,47 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
send_events(events)
end
- private
- def send_events(events)
- documents = events.map { |event| event_body(event) }.join("\n")
- event_num = events.size
-
- # @logger.info("get event num: #{event_num}")
- @logger.debug("get documents: #{documents}")
-
+ def create_http_headers(table)
http_headers = @request_headers.dup
if !@group_commit
# only set label if group_commit is off_mode or not set, since lable
can not be used with group_commit
- http_headers["label"] = @label_prefix + "_" + @db + "_" + @table +
"_" + Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid)
+ http_headers["label"] = @label_prefix + "_" + @db + "_" + table + "_"
+ Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid)
+ end
+ http_headers
+ end
+
+ private
+ def send_events(events)
+ table_events_map = Hash.new
+ if @const_table
+ table_events = TableEvents.new(@table, create_http_headers(@table))
+ table_events.events = events
+ table_events_map[@table] = table_events
+ else
+ events.each do |event|
+ table = event.sprintf(@table)
+ if table == "" || !table.index("%").nil?
+ table = @default_table
+ if table == ""
+ @logger.warn("table format error, the default table is not
set, the data will be dropped")
+ else
+ @logger.warn("table format error, use the default table:
#{table}")
+ end
+ end
+ table_events = table_events_map[table]
+ if table_events == nil
+ table_events = TableEvents.new(table,
create_http_headers(table))
+ table_events_map[table] = table_events
+ end
+ table_events.events << event
+ end
end
- handle_request(documents, http_headers, event_num, 1)
+ table_events_map.each do |_, table_events|
+ serialize(table_events)
+ end
+
+ handle_request(table_events_map)
end
def sleep_for_attempt(attempt)
@@ -192,82 +244,111 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
(sleep_for/2) + (rand(0..sleep_for)/2)
end
+ STAT_SUCCESS = 0
+ STAT_FAIL = 1
+ STAT_RETRY = 2
+
private
- def handle_request(documents, http_headers, event_num, req_count)
- response = make_request(documents, http_headers, @http_query,
@http_hosts.sample)
- response_json = {}
- begin
- response_json = JSON.parse(response.body)
- rescue => _
- @logger.warn("doris stream load response is not a valid
JSON:\n#{response}")
- end
+ def handle_request(table_events_map)
+ make_request(table_events_map)
+ retry_map = Hash.new
+ table_events_map.each do |table, table_events|
+ stat = STAT_SUCCESS
+
+ if table == ""
+ @logger.warn("drop #{table_events.events_count} records because of
empty table")
+ stat = STAT_FAIL
+ end
- status = response_json["Status"]
+ response = ""
+ if stat == STAT_SUCCESS
+ begin
+ response = table_events.response_future.get.getBodyText
+ rescue => e
+ log_failure("doris stream load request error: #{e}")
+ stat = STAT_RETRY
+ end
+ end
- need_retry = true
+ response_json = {}
+ if stat == STAT_SUCCESS
+ begin
+ response_json = JSON.parse(response)
+ rescue => _
+ @logger.warn("doris stream load response is not a valid
JSON:\n#{response}")
+ stat = STAT_RETRY
+ end
+ end
- if status == 'Label Already Exists'
- @logger.warn("Label already exists: #{response_json['Label']}, skip
#{event_num} records:\n#{response}")
- need_retry = false
+ if stat == STAT_SUCCESS
+ status = response_json["Status"]
+
+ if status == 'Label Already Exists'
+ @logger.warn("Label already exists: #{response_json['Label']},
skip #{table_events.events_count} records:\n#{response}")
+
+ elsif status == "Success" || status == "Publish Timeout"
+ @total_bytes.addAndGet(table_events.documents.size)
+ @total_rows.addAndGet(table_events.events_count)
+ if @log_request or @logger.debug?
+ @logger.info("doris stream load response:\n#{response}")
+ end
+
+ else
+ @logger.warn("FAILED doris stream load response:\n#{response}")
+ if @max_retries >= 0 && table_events.req_count - 1 >=
@max_retries
+ @logger.warn("DROP this batch after failed
#{table_events.req_count} times.")
+ stat = STAT_FAIL
+ else
+ stat = STAT_RETRY
+ end
+ end
+ end
- elsif status == "Success" || status == "Publish Timeout"
- @total_bytes.addAndGet(documents.size)
- @total_rows.addAndGet(event_num)
- if @log_request or @logger.debug?
- @logger.info("doris stream load response:\n#{response}")
+ if stat == STAT_FAIL && @save_on_failure
+ @logger.warn("Try save to disk.Disk file path :
#{@save_dir}/#{table}_#{@save_file}")
+ save_to_disk(table_events.documents, table)
end
- need_retry = false
-
- elsif @max_retries >= 0 && req_count - 1 > @max_retries
- @logger.warn("FAILED doris stream load response:\n#{response}")
- @logger.warn("DROP this batch after failed #{req_count} times.")
- if @save_on_failure
- @logger.warn("Try save to disk.Disk file path :
#{@save_dir}/#{@table}_#{@save_file}")
- save_to_disk(documents)
+
+ if stat != STAT_RETRY && table_events.req_count > 1
+ @retry_queue_bytes.addAndGet(-table_events.documents.size)
end
- need_retry = false
- end
- if !need_retry
- if req_count > 1
- @retry_queue_bytes.addAndGet(-documents.size)
+ if stat == STAT_RETRY
+ table_events.prepare_retry
+ retry_map[table] = table_events
end
- return
end
- # add to retry_queue
- sleep_for = sleep_for_attempt(req_count)
- @logger.warn("FAILED doris stream load response:\n#{response}")
- @logger.warn("Will do the #{req_count}th retry after #{sleep_for} secs.")
- delay_event = DelayEvent.new(sleep_for, [documents, http_headers,
event_num, req_count+1])
- add_event_to_retry_queue(delay_event)
+ if retry_map.size > 0
+ # add to retry_queue
+ req_count = retry_map.values[0].req_count
+ sleep_for = sleep_for_attempt(req_count)
+ @logger.warn("Will do the #{req_count-1}th retry after #{sleep_for}
secs.")
+ delay_event = DelayEvent.new(sleep_for, retry_map)
+ add_event_to_retry_queue(delay_event)
+ end
end
private
- def make_request(documents, http_headers, query, host)
- url = host + query
+ def make_request(table_events_map)
+ table_events_map.each do |table, table_events|
+ url = @http_hosts.sample + http_query(table)
- if @log_request or @logger.debug?
- @logger.info("doris stream load request url: #{url} headers:
#{http_headers} body size: #{documents.size}")
- end
- @logger.debug("doris stream load request body: #{documents}")
+ if @log_request or @logger.debug?
+ @logger.info("doris stream load request url: #{url} headers:
#{table_events.http_headers} body size: #{table_events.documents.size}")
+ end
+ @logger.debug("doris stream load request body:
#{table_events.documents}")
+
+ request = SimpleRequestBuilder.
+ put(url).
+ setBody(table_events.documents, ContentType::TEXT_PLAIN).
+ build
+ table_events.http_headers.each do |k, v|
+ request.addHeader(k, v)
+ end
- response = ""
- begin
- response = RestClient.put(url, documents, http_headers) { |res,
request, result|
- case res.code
- when 301, 302, 307
- @logger.debug("redirect to: #{res.headers[:location]}")
- res.follow_redirection
- else
- res.return!
- end
- }
- rescue => e
- log_failure("doris stream load request error: #{e}")
+ table_events.response_future = @client.execute(request, nil)
end
-
- response
end # def make_request
# Format the HTTP body
@@ -307,10 +388,10 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
end
private
- def save_to_disk(documents)
+ def save_to_disk(documents, table)
begin
- file = File.open("#{@save_dir}/#{@db}_#{@table}_#{@save_file}", "a")
- file.write(documents)
+ file = File.open("#{@save_dir}/#{@db}_#{table}_#{@save_file}", "a")
+ file.write(documents, "\n")
rescue IOError => e
log_failure("An error occurred while saving file to disk: #{e}",
:file_name => file_name)
@@ -332,4 +413,34 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
headers
end
+
+ def serialize(table_events)
+ table_events.events_count = table_events.events.size
+ table_events.documents = table_events.events.map { |e| event_body(e)
}.join("\n")
+ table_events.events = nil # no longer used, can be gc
+
+ @logger.debug("get documents: #{table_events.documents}")
+ end
+
+ class TableEvents
+ attr_accessor :table, :http_headers, :events, :events_count, :documents,
:req_count, :response_future
+
+ def initialize(table, http_headers)
+ @table = table
+ @http_headers = http_headers
+
+ @events = []
+ @events_count = 0
+ @documents = ""
+ @req_count = 1
+
+ @response_future = nil
+ end
+
+ def prepare_retry
+ @req_count += 1
+ @response_future = nil
+ end
+ end
+
end # end of class LogStash::Outputs::Doris
diff --git a/extension/logstash/lib/logstash/util/delay_event.rb
b/extension/logstash/lib/logstash/util/delay_event.rb
index 86f59ef457f..04c23a76621 100644
--- a/extension/logstash/lib/logstash/util/delay_event.rb
+++ b/extension/logstash/lib/logstash/util/delay_event.rb
@@ -20,9 +20,11 @@ require 'java'
class DelayEvent
include java.util.concurrent.Delayed
+ attr_accessor :start_time, :event
+
def initialize(delay, event)
@start_time = Time.now.to_i + delay
- @event = event # event style: [documents, http_headers, event_num,
req_count]
+ @event = event # Hash[table, TableEvents]
end
def get_delay(unit)
@@ -36,19 +38,7 @@ class DelayEvent
d < 0 ? -1 : 1
end
- def start_time
- @start_time
- end
-
- def event
- @event
- end
-
- def documents
- @event[0]
- end
-
def first_retry
- @event[3] == 2
+ @event.values[0].req_count == 2
end
end
diff --git a/extension/logstash/logstash-output-doris.gemspec
b/extension/logstash/logstash-output-doris.gemspec
index f44d57d0511..ba94518d279 100644
--- a/extension/logstash/logstash-output-doris.gemspec
+++ b/extension/logstash/logstash-output-doris.gemspec
@@ -18,7 +18,7 @@ under the License.
=end
Gem::Specification.new do |s|
s.name = 'logstash-output-doris'
- s.version = '1.1.0'
+ s.version = '1.2.0'
s.author = 'Apache Doris'
s.email = '[email protected]'
s.homepage = 'http://doris.apache.org'
@@ -27,6 +27,8 @@ Gem::Specification.new do |s|
s.description = "This gem is a logstash plugin required to be installed
on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname.
This gem is not a stand-alone program"
s.require_paths = ["lib"]
+ s.platform = 'java'
+
# Files
s.files = Dir['lib/**/*','spec/**/*','*.gemspec','*.md','Gemfile','LICENSE' ]
@@ -38,9 +40,11 @@ Gem::Specification.new do |s|
# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
- s.add_runtime_dependency "rest-client", '~> 2.1'
+ s.add_runtime_dependency 'jar-dependencies', ">= 0.0.2" # depends on
logstash version
s.add_development_dependency 'logstash-devutils', '~> 1.3'
s.add_development_dependency 'sinatra', '~> 1.4'
s.add_development_dependency 'webrick', '~> 1.9'
+
+ s.requirements << 'jar org.apache.httpcomponents.client5, httpclient5, 5.4.2'
end
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]