This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dba88949317 [fix](logstash plugin) fix group_commit and sprintf with
missing value (#44329)
dba88949317 is described below
commit dba889493171dfdd686b44f420022a74dc52183b
Author: Mingxi <[email protected]>
AuthorDate: Tue Nov 26 10:43:35 2024 +0800
[fix](logstash plugin) fix group_commit and sprintf with missing value
(#44329)
---
extension/logstash/lib/logstash/outputs/doris.rb | 44 +++++++++-------
extension/logstash/lib/logstash/util/formater.rb | 65 ++++++++++++++++++++++++
extension/logstash/logstash-output-doris.gemspec | 6 +--
3 files changed, 94 insertions(+), 21 deletions(-)
diff --git a/extension/logstash/lib/logstash/outputs/doris.rb
b/extension/logstash/lib/logstash/outputs/doris.rb
index c57b7cb47b2..02e7591b0a3 100644
--- a/extension/logstash/lib/logstash/outputs/doris.rb
+++ b/extension/logstash/lib/logstash/outputs/doris.rb
@@ -22,8 +22,8 @@ require "logstash/outputs/base"
require "logstash/namespace"
require "logstash/json"
require "logstash/util/shortname_resolver"
+require 'logstash/util/formater'
require "uri"
-require "logstash/plugin_mixins/http_client"
require "securerandom"
require "json"
require "base64"
@@ -77,9 +77,9 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
def print_plugin_info()
- @@plugins = Gem::Specification.find_all{|spec| spec.name =~
/logstash-output-doris/ }
- @plugin_name = @@plugins[0].name
- @plugin_version = @@plugins[0].version
+ @plugins = Gem::Specification.find_all{|spec| spec.name =~
/logstash-output-doris/ }
+ @plugin_name = @plugins[0].name
+ @plugin_version = @plugins[0].version
@logger.debug("Running #{@plugin_name} version #{@plugin_version}")
@logger.info("Initialized doris output with settings",
@@ -90,17 +90,17 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
end
def register
- @http_query = "/api/#{db}/#{table}/_stream_load"
+ @http_query = "/api/#{@db}/#{@table}/_stream_load"
@hostnames_pool =
- parse_http_hosts(http_hosts,
+ parse_http_hosts(@http_hosts,
ShortNameResolver.new(ttl: @host_resolve_ttl_sec, logger: @logger))
@request_headers = make_request_headers
@logger.info("request headers: ", @request_headers)
@group_commit = false
- if http_headers.has_key?("group_commit") && http_headers["group_commit"]
!= "off_mode"
+ if @request_headers.has_key?("group_commit") &&
@request_headers["group_commit"] != "off_mode"
@group_commit = true
end
@logger.info("group_commit: ", @group_commit)
@@ -196,7 +196,7 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
http_headers = @request_headers.dup
if !@group_commit
# only set label if group_commit is off_mode or not set, since lable
can not be used with group_commit
- http_headers["label"] = label_prefix + "_" + @db + "_" + @table + "_"
+ Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid)
+ http_headers["label"] = @label_prefix + "_" + @db + "_" + @table +
"_" + Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid)
end
req_count = 0
@@ -211,7 +211,15 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
rescue => e
@logger.warn("doris stream load response: #{response} is not a
valid JSON")
end
- if response_json["Status"] == "Success"
+
+ status = response_json["Status"]
+
+ if status == 'Label Already Exists'
+ @logger.warn("Label already exists: #{response_json['Label']}, skip
#{event_num} records.")
+ break
+ end
+
+ if status == "Success" || status == "Publish Timeout"
@total_bytes.addAndGet(documents.size)
@total_rows.addAndGet(event_num)
break
@@ -221,7 +229,7 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
if @max_retries >= 0 && req_count > @max_retries
@logger.warn("DROP this batch after failed #{req_count} times.")
if @save_on_failure
- @logger.warn("Try save to disk.Disk file path :
#{save_dir}/#{table}_#{save_file}")
+ @logger.warn("Try save to disk.Disk file path :
#{@save_dir}/#{@table}_#{@save_file}")
save_to_disk(documents)
end
break
@@ -252,13 +260,13 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
response = ""
begin
- response = RestClient.put(url, documents, http_headers) { |response,
request, result|
- case response.code
+ response = RestClient.put(url, documents, http_headers) { |res,
request, result|
+ case res.code
when 301, 302, 307
- @logger.debug("redirect to:
#{response.headers[:location]}")
- response.follow_redirection
+ @logger.debug("redirect to: #{res.headers[:location]}")
+ res.follow_redirection
else
- response.return!
+ res.return!
end
}
rescue => e
@@ -304,14 +312,14 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
elsif mapping.is_a?(Array)
mapping.map { |elem| convert_mapping(elem, event) }
else
- event.sprintf(mapping)
+ Formater.sprintf(event, mapping)
end
end
private
def save_to_disk(documents)
begin
- file = File.open("#{save_dir}/#{db}_#{table}_#{save_file}", "a")
+ file = File.open("#{@save_dir}/#{@db}_#{@table}_#{@save_file}", "a")
file.write(documents)
rescue IOError => e
log_failure("An error occurred while saving file to disk: #{e}",
@@ -330,7 +338,7 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
headers = @headers || {}
headers["Expect"] ||= "100-continue"
headers["Content-Type"] ||= "text/plain;charset=utf-8"
- headers["Authorization"] = "Basic " +
Base64.strict_encode64("#{user}:#{password.value}")
+ headers["Authorization"] = "Basic " +
Base64.strict_encode64("#{@user}:#{@password.value}")
headers
end
diff --git a/extension/logstash/lib/logstash/util/formater.rb
b/extension/logstash/lib/logstash/util/formater.rb
new file mode 100644
index 00000000000..8c3540d7347
--- /dev/null
+++ b/extension/logstash/lib/logstash/util/formater.rb
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require 'java'
+
+class Formater
+
+ # copied form
https://www.rubydoc.info/gems/logstash-event/LogStash/Event#sprintf-instance_method
+ # modified by doris: return empty str if template field does not exist in
the event rather than the template.
+ def self.sprintf(event, format)
+ format = format.to_s
+ if format.index("%").nil?
+ return format
+ end
+
+ format.gsub(/%\{[^}]+}/) do |tok|
+ # Take the inside of the %{ ... }
+ key = tok[2...-1]
+
+ if key == "+%s"
+ # Got %{+%s}, support for unix epoch time
+ next event.timestamp.to_i
+ elsif key[0, 1] == "+"
+ t = event.timestamp
+ formatter = org.joda.time.format.DateTimeFormat.forPattern(key[1..-1])
\
+ .withZone(org.joda.time.DateTimeZone::UTC)
+ # next org.joda.time.Instant.new(t.tv_sec * 1000 + t.tv_usec /
1000).toDateTime.toString(formatter)
+ # Invoke a specific Instant constructor to avoid this warning in JRuby
+ # > ambiguous Java methods found, using org.joda.time.Instant(long)
+ #
org.joda.time.Instant.java_class.constructor(Java::long).new_instance(
+ # t.tv_sec * 1000 + t.tv_usec / 1000
+ # ).to_java.toDateTime.toString(formatter)
+ mill = java.lang.Long.valueOf(t.to_i * 1000 + t.tv_usec / 1000)
+ org.joda.time.Instant.new(mill).toDateTime.toString(formatter)
+ else
+ value = event.get(key)
+ case value
+ when nil
+ '' # empty str if this field does not exist in this event
+ when Array
+ value.join(",") # join by ',' if value is an array
+ when Hash
+ value.to_json # convert hashes to json
+ else
+ value # otherwise return the value
+ end # case value
+ end # 'key' checking
+ end # format.gsub...
+ end
+
+end
diff --git a/extension/logstash/logstash-output-doris.gemspec
b/extension/logstash/logstash-output-doris.gemspec
index 6536d89ccc3..689b93503f6 100644
--- a/extension/logstash/logstash-output-doris.gemspec
+++ b/extension/logstash/logstash-output-doris.gemspec
@@ -41,7 +41,7 @@ Gem::Specification.new do |s|
s.add_runtime_dependency 'mini_cache', ">= 1.0.0", "< 2.0.0"
s.add_runtime_dependency "rest-client", '~> 2.1'
- s.add_development_dependency 'logstash-devutils', '~> 2.0', '>= 2.0.3'
- s.add_development_dependency 'sinatra', '~> 2.0', '>= 2.0.8.1'
- s.add_development_dependency 'webrick', '~> 1.6'
+ s.add_development_dependency 'logstash-devutils', '~> 1.3'
+ s.add_development_dependency 'sinatra', '~> 1.4'
+ s.add_development_dependency 'webrick', '~> 1.9'
end
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]