This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dba88949317 [fix](logstash plugin) fix group_commit and sprintf with 
missing value (#44329)
dba88949317 is described below

commit dba889493171dfdd686b44f420022a74dc52183b
Author: Mingxi <[email protected]>
AuthorDate: Tue Nov 26 10:43:35 2024 +0800

    [fix](logstash plugin) fix group_commit and sprintf with missing value 
(#44329)
---
 extension/logstash/lib/logstash/outputs/doris.rb | 44 +++++++++-------
 extension/logstash/lib/logstash/util/formater.rb | 65 ++++++++++++++++++++++++
 extension/logstash/logstash-output-doris.gemspec |  6 +--
 3 files changed, 94 insertions(+), 21 deletions(-)

diff --git a/extension/logstash/lib/logstash/outputs/doris.rb 
b/extension/logstash/lib/logstash/outputs/doris.rb
index c57b7cb47b2..02e7591b0a3 100644
--- a/extension/logstash/lib/logstash/outputs/doris.rb
+++ b/extension/logstash/lib/logstash/outputs/doris.rb
@@ -22,8 +22,8 @@ require "logstash/outputs/base"
 require "logstash/namespace"
 require "logstash/json"
 require "logstash/util/shortname_resolver"
+require 'logstash/util/formater'
 require "uri"
-require "logstash/plugin_mixins/http_client"
 require "securerandom"
 require "json"
 require "base64"
@@ -77,9 +77,9 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
 
 
    def print_plugin_info()
-      @@plugins = Gem::Specification.find_all{|spec| spec.name =~ 
/logstash-output-doris/ }
-      @plugin_name = @@plugins[0].name
-      @plugin_version = @@plugins[0].version
+      @plugins = Gem::Specification.find_all{|spec| spec.name =~ 
/logstash-output-doris/ }
+      @plugin_name = @plugins[0].name
+      @plugin_version = @plugins[0].version
       @logger.debug("Running #{@plugin_name} version #{@plugin_version}")
 
       @logger.info("Initialized doris output with settings",
@@ -90,17 +90,17 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
    end
 
    def register
-      @http_query = "/api/#{db}/#{table}/_stream_load"
+      @http_query = "/api/#{@db}/#{@table}/_stream_load"
 
       @hostnames_pool =
-      parse_http_hosts(http_hosts,
+      parse_http_hosts(@http_hosts,
       ShortNameResolver.new(ttl: @host_resolve_ttl_sec, logger: @logger))
 
       @request_headers = make_request_headers
       @logger.info("request headers: ", @request_headers)
 
       @group_commit = false
-      if http_headers.has_key?("group_commit") && http_headers["group_commit"] 
!= "off_mode"
+      if @request_headers.has_key?("group_commit") && 
@request_headers["group_commit"] != "off_mode"
          @group_commit = true
       end
       @logger.info("group_commit: ", @group_commit)
@@ -196,7 +196,7 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
       http_headers = @request_headers.dup
       if !@group_commit
          # only set label if group_commit is off_mode or not set, since lable 
can not be used with group_commit
-         http_headers["label"] = label_prefix + "_" + @db + "_" + @table + "_" 
+ Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid)
+         http_headers["label"] = @label_prefix + "_" + @db + "_" + @table + 
"_" + Time.now.strftime('%Y%m%d_%H%M%S_%L_' + SecureRandom.uuid)
       end
 
       req_count = 0
@@ -211,7 +211,15 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
          rescue => e
             @logger.warn("doris stream load response: #{response} is not a 
valid JSON")
          end
-         if response_json["Status"] == "Success"
+
+         status = response_json["Status"]
+
+         if status == 'Label Already Exists'
+           @logger.warn("Label already exists: #{response_json['Label']}, skip 
#{event_num} records.")
+           break
+         end
+
+         if status == "Success" || status == "Publish Timeout"
             @total_bytes.addAndGet(documents.size)
             @total_rows.addAndGet(event_num)
             break
@@ -221,7 +229,7 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
             if @max_retries >= 0 && req_count > @max_retries
                @logger.warn("DROP this batch after failed #{req_count} times.")
                if @save_on_failure
-                  @logger.warn("Try save to disk.Disk file path : 
#{save_dir}/#{table}_#{save_file}")
+                  @logger.warn("Try save to disk.Disk file path : 
#{@save_dir}/#{@table}_#{@save_file}")
                   save_to_disk(documents)
                end
                break
@@ -252,13 +260,13 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
 
       response = ""
       begin
-         response = RestClient.put(url, documents, http_headers) { |response, 
request, result|
-                case response.code
+         response = RestClient.put(url, documents, http_headers) { |res, 
request, result|
+                case res.code
                 when 301, 302, 307
-                    @logger.debug("redirect to: 
#{response.headers[:location]}")
-                    response.follow_redirection
+                    @logger.debug("redirect to: #{res.headers[:location]}")
+                    res.follow_redirection
                 else
-                    response.return!
+                  res.return!
                 end
          }
       rescue => e
@@ -304,14 +312,14 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
       elsif mapping.is_a?(Array)
         mapping.map { |elem| convert_mapping(elem, event) }
       else
-        event.sprintf(mapping)
+        Formater.sprintf(event, mapping)
       end
    end
 
    private
    def save_to_disk(documents)
       begin
-         file = File.open("#{save_dir}/#{db}_#{table}_#{save_file}", "a")
+         file = File.open("#{@save_dir}/#{@db}_#{@table}_#{@save_file}", "a")
          file.write(documents)
       rescue IOError => e
          log_failure("An error occurred while saving file to disk: #{e}",
@@ -330,7 +338,7 @@ class LogStash::Outputs::Doris < LogStash::Outputs::Base
       headers = @headers || {}
       headers["Expect"] ||= "100-continue"
       headers["Content-Type"] ||= "text/plain;charset=utf-8"
-      headers["Authorization"] = "Basic " + 
Base64.strict_encode64("#{user}:#{password.value}")
+      headers["Authorization"] = "Basic " + 
Base64.strict_encode64("#{@user}:#{@password.value}")
   
       headers
    end
diff --git a/extension/logstash/lib/logstash/util/formater.rb 
b/extension/logstash/lib/logstash/util/formater.rb
new file mode 100644
index 00000000000..8c3540d7347
--- /dev/null
+++ b/extension/logstash/lib/logstash/util/formater.rb
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require 'java'
+
+class Formater
+
+  # copied form 
https://www.rubydoc.info/gems/logstash-event/LogStash/Event#sprintf-instance_method
+  # modified by doris: return empty str if template field does not exist in 
the event rather than the template.
+  def self.sprintf(event, format)
+    format = format.to_s
+    if format.index("%").nil?
+      return format
+    end
+
+    format.gsub(/%\{[^}]+}/) do |tok|
+      # Take the inside of the %{ ... }
+      key = tok[2...-1]
+
+      if key == "+%s"
+        # Got %{+%s}, support for unix epoch time
+        next event.timestamp.to_i
+      elsif key[0, 1] == "+"
+        t = event.timestamp
+        formatter = org.joda.time.format.DateTimeFormat.forPattern(key[1..-1]) 
\
+                       .withZone(org.joda.time.DateTimeZone::UTC)
+        # next org.joda.time.Instant.new(t.tv_sec * 1000 + t.tv_usec / 
1000).toDateTime.toString(formatter)
+        # Invoke a specific Instant constructor to avoid this warning in JRuby
+        #  > ambiguous Java methods found, using org.joda.time.Instant(long)
+        # 
org.joda.time.Instant.java_class.constructor(Java::long).new_instance(
+        #   t.tv_sec * 1000 + t.tv_usec / 1000
+        # ).to_java.toDateTime.toString(formatter)
+        mill = java.lang.Long.valueOf(t.to_i * 1000 + t.tv_usec / 1000)
+        org.joda.time.Instant.new(mill).toDateTime.toString(formatter)
+      else
+        value = event.get(key)
+        case value
+        when nil
+          '' # empty str if this field does not exist in this event
+        when Array
+          value.join(",") # join by ',' if value is an array
+        when Hash
+          value.to_json # convert hashes to json
+        else
+          value # otherwise return the value
+        end # case value
+      end # 'key' checking
+    end # format.gsub...
+  end
+
+end
diff --git a/extension/logstash/logstash-output-doris.gemspec 
b/extension/logstash/logstash-output-doris.gemspec
index 6536d89ccc3..689b93503f6 100644
--- a/extension/logstash/logstash-output-doris.gemspec
+++ b/extension/logstash/logstash-output-doris.gemspec
@@ -41,7 +41,7 @@ Gem::Specification.new do |s|
   s.add_runtime_dependency 'mini_cache', ">= 1.0.0", "< 2.0.0"
   s.add_runtime_dependency "rest-client", '~> 2.1'
 
-  s.add_development_dependency 'logstash-devutils', '~> 2.0', '>= 2.0.3'
-  s.add_development_dependency 'sinatra', '~> 2.0', '>= 2.0.8.1'
-  s.add_development_dependency 'webrick', '~> 1.6'
+  s.add_development_dependency 'logstash-devutils', '~> 1.3'
+  s.add_development_dependency 'sinatra', '~> 1.4'
+  s.add_development_dependency 'webrick', '~> 1.9'
 end


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to