This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f381782  [fix] fix malloc and free mismatch issue  (#7702)
f381782 is described below

commit f3817829bbc7612b46ab97be1e674d8e31ee4ad9
Author: Zhengguo Yang <[email protected]>
AuthorDate: Fri Jan 14 09:32:33 2022 +0800

    [fix] fix malloc and free mismatch issue  (#7702)
    
    The memory allocate by `malloc` should be freed by `free`
---
 be/CMakeLists.txt                             |  5 +++--
 be/src/common/daemon.cpp                      | 10 ++++++----
 be/src/exec/s3_reader.cpp                     |  5 +++--
 be/src/exec/s3_writer.cpp                     | 14 +++++++++-----
 be/src/olap/wrapper_field.h                   |  2 +-
 be/src/runtime/routine_load/data_consumer.cpp |  2 ++
 6 files changed, 24 insertions(+), 14 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index f869427..24f28b1 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -486,8 +486,6 @@ endif()
 set(COMMON_THIRDPARTY
     rocksdb
     cyrus-sasl
-    librdkafka_cpp
-    librdkafka
     libs2
     snappy
     Boost::date_time
@@ -525,6 +523,9 @@ set(COMMON_THIRDPARTY
     minizip
     breakpad
     ${AWS_LIBS}
+    # put this after lz4 to avoid using lz4 lib in librdkafka
+    librdkafka_cpp
+    librdkafka
 )
 
 if (${MAKE_TEST} STREQUAL "ON")
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index b6863b5..044feda 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -200,8 +200,10 @@ static void init_doris_metrics(const 
std::vector<StorePath>& store_paths) {
     DorisMetrics::instance()->initialize(init_system_metrics, disk_devices, 
network_interfaces);
 }
 
-void sigterm_handler(int signo) {
-    k_doris_exit = true;
+void signal_handler(int signal) {
+    if (signal == SIGINT || signal == SIGTERM) {
+        k_doris_exit = true;
+    }
 }
 
 int install_signal(int signo, void (*handler)(int)) {
@@ -219,11 +221,11 @@ int install_signal(int signo, void (*handler)(int)) {
 }
 
 void init_signals() {
-    auto ret = install_signal(SIGINT, sigterm_handler);
+    auto ret = install_signal(SIGINT, signal_handler);
     if (ret < 0) {
         exit(-1);
     }
-    ret = install_signal(SIGTERM, sigterm_handler);
+    ret = install_signal(SIGTERM, signal_handler);
     if (ret < 0) {
         exit(-1);
     }
diff --git a/be/src/exec/s3_reader.cpp b/be/src/exec/s3_reader.cpp
index ae5cc3b..30e6daa 100644
--- a/be/src/exec/s3_reader.cpp
+++ b/be/src/exec/s3_reader.cpp
@@ -23,6 +23,7 @@
 
 #include "common/logging.h"
 #include "gutil/strings/strcat.h"
+#include "service/backend_options.h"
 #include "util/s3_util.h"
 
 namespace doris {
@@ -64,7 +65,7 @@ Status S3Reader::open() {
     } else {
         std::stringstream out;
         out << "Error: [" << response.GetError().GetExceptionName() << ":"
-            << response.GetError().GetMessage();
+            << response.GetError().GetMessage() << "] at " << 
BackendOptions::get_localhost();
         return Status::InternalError(out.str());
     }
 }
@@ -99,7 +100,7 @@ Status S3Reader::readat(int64_t position, int64_t nbytes, 
int64_t* bytes_read, v
         *bytes_read = 0;
         std::stringstream out;
         out << "Error: [" << response.GetError().GetExceptionName() << ":"
-            << response.GetError().GetMessage();
+            << response.GetError().GetMessage() << "] at " << 
BackendOptions::get_localhost();
         LOG(INFO) << out.str();
         return Status::InternalError(out.str());
     }
diff --git a/be/src/exec/s3_writer.cpp b/be/src/exec/s3_writer.cpp
index 97545d1..17e64a4 100644
--- a/be/src/exec/s3_writer.cpp
+++ b/be/src/exec/s3_writer.cpp
@@ -23,6 +23,7 @@
 #include <aws/s3/model/PutObjectRequest.h>
 
 #include "common/logging.h"
+#include "service/backend_options.h"
 #include "util/s3_uri.h"
 #include "util/s3_util.h"
 
@@ -66,7 +67,7 @@ Status S3Writer::open() {
     } else {
         std::stringstream out;
         out << "Error: [" << response.GetError().GetExceptionName() << ":"
-            << response.GetError().GetMessage();
+            << response.GetError().GetMessage() << "] at " << 
BackendOptions::get_localhost();
         return Status::InternalError(out.str());
     }
 }
@@ -77,11 +78,13 @@ Status S3Writer::write(const uint8_t* buf, size_t buf_len, 
size_t* written_len)
         return Status::OK();
     }
     if (!_temp_file) {
-        return Status::BufferAllocFailed("The internal temporary file is not 
writable.");
+        return Status::BufferAllocFailed("The internal temporary file is not 
writable. at " +
+                                         BackendOptions::get_localhost());
     }
     _temp_file->write(reinterpret_cast<const char*>(buf), buf_len);
     if (!_temp_file->good()) {
-        return Status::BufferAllocFailed("Could not append to the internal 
temporary file.");
+        return Status::BufferAllocFailed("Could not append to the internal 
temporary file. at " +
+                                         BackendOptions::get_localhost());
     }
     *written_len = buf_len;
     return Status::OK();
@@ -97,7 +100,8 @@ Status S3Writer::close() {
 
 Status S3Writer::_sync() {
     if (!_temp_file) {
-        return Status::BufferAllocFailed("The internal temporary file is not 
writable.");
+        return Status::BufferAllocFailed("The internal temporary file is not 
writable. at " +
+                                         BackendOptions::get_localhost());
     }
     CHECK_S3_CLIENT(_client);
     Aws::S3::Model::PutObjectRequest request;
@@ -114,7 +118,7 @@ Status S3Writer::_sync() {
     } else {
         std::stringstream out;
         out << "Error: [" << response.GetError().GetExceptionName() << ":"
-            << response.GetError().GetMessage();
+            << response.GetError().GetMessage() << "] at " << 
BackendOptions::get_localhost();
         return Status::InternalError(out.str());
     }
 }
diff --git a/be/src/olap/wrapper_field.h b/be/src/olap/wrapper_field.h
index 3d22a7b..51e4b3e 100644
--- a/be/src/olap/wrapper_field.h
+++ b/be/src/olap/wrapper_field.h
@@ -42,7 +42,7 @@ public:
         delete _rep;
         delete[] _owned_buf;
         if (_long_text_buf) {
-            delete _long_text_buf;
+            free(_long_text_buf);
         }
     }
 
diff --git a/be/src/runtime/routine_load/data_consumer.cpp 
b/be/src/runtime/routine_load/data_consumer.cpp
index 7522e86..2cf330f 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -164,6 +164,7 @@ Status KafkaDataConsumer::assign_topic_partitions(
     if (err) {
         LOG(WARNING) << "failed to assign topic partitions: " << 
ctx->brief(true)
                      << ", err: " << RdKafka::err2str(err);
+        _k_consumer->unassign();
         return Status::InternalError("failed to assign topic partitions");
     }
 
@@ -382,6 +383,7 @@ Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) {
 Status KafkaDataConsumer::reset() {
     std::unique_lock<std::mutex> l(_lock);
     _cancelled = false;
+    _k_consumer->unassign();
     // reset will be called before this consumer being returned to the pool.
     // so update _last_visit_time is reasonable.
     _last_visit_time = time(nullptr);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to