This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new f381782 [fix] fix malloc and free mismatch issue (#7702)
f381782 is described below
commit f3817829bbc7612b46ab97be1e674d8e31ee4ad9
Author: Zhengguo Yang <[email protected]>
AuthorDate: Fri Jan 14 09:32:33 2022 +0800
[fix] fix malloc and free mismatch issue (#7702)
The memory allocate by `malloc` should be freed by `free`
---
be/CMakeLists.txt | 5 +++--
be/src/common/daemon.cpp | 10 ++++++----
be/src/exec/s3_reader.cpp | 5 +++--
be/src/exec/s3_writer.cpp | 14 +++++++++-----
be/src/olap/wrapper_field.h | 2 +-
be/src/runtime/routine_load/data_consumer.cpp | 2 ++
6 files changed, 24 insertions(+), 14 deletions(-)
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index f869427..24f28b1 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -486,8 +486,6 @@ endif()
set(COMMON_THIRDPARTY
rocksdb
cyrus-sasl
- librdkafka_cpp
- librdkafka
libs2
snappy
Boost::date_time
@@ -525,6 +523,9 @@ set(COMMON_THIRDPARTY
minizip
breakpad
${AWS_LIBS}
+ # put this after lz4 to avoid using lz4 lib in librdkafka
+ librdkafka_cpp
+ librdkafka
)
if (${MAKE_TEST} STREQUAL "ON")
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index b6863b5..044feda 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -200,8 +200,10 @@ static void init_doris_metrics(const
std::vector<StorePath>& store_paths) {
DorisMetrics::instance()->initialize(init_system_metrics, disk_devices,
network_interfaces);
}
-void sigterm_handler(int signo) {
- k_doris_exit = true;
+void signal_handler(int signal) {
+ if (signal == SIGINT || signal == SIGTERM) {
+ k_doris_exit = true;
+ }
}
int install_signal(int signo, void (*handler)(int)) {
@@ -219,11 +221,11 @@ int install_signal(int signo, void (*handler)(int)) {
}
void init_signals() {
- auto ret = install_signal(SIGINT, sigterm_handler);
+ auto ret = install_signal(SIGINT, signal_handler);
if (ret < 0) {
exit(-1);
}
- ret = install_signal(SIGTERM, sigterm_handler);
+ ret = install_signal(SIGTERM, signal_handler);
if (ret < 0) {
exit(-1);
}
diff --git a/be/src/exec/s3_reader.cpp b/be/src/exec/s3_reader.cpp
index ae5cc3b..30e6daa 100644
--- a/be/src/exec/s3_reader.cpp
+++ b/be/src/exec/s3_reader.cpp
@@ -23,6 +23,7 @@
#include "common/logging.h"
#include "gutil/strings/strcat.h"
+#include "service/backend_options.h"
#include "util/s3_util.h"
namespace doris {
@@ -64,7 +65,7 @@ Status S3Reader::open() {
} else {
std::stringstream out;
out << "Error: [" << response.GetError().GetExceptionName() << ":"
- << response.GetError().GetMessage();
+ << response.GetError().GetMessage() << "] at " <<
BackendOptions::get_localhost();
return Status::InternalError(out.str());
}
}
@@ -99,7 +100,7 @@ Status S3Reader::readat(int64_t position, int64_t nbytes,
int64_t* bytes_read, v
*bytes_read = 0;
std::stringstream out;
out << "Error: [" << response.GetError().GetExceptionName() << ":"
- << response.GetError().GetMessage();
+ << response.GetError().GetMessage() << "] at " <<
BackendOptions::get_localhost();
LOG(INFO) << out.str();
return Status::InternalError(out.str());
}
diff --git a/be/src/exec/s3_writer.cpp b/be/src/exec/s3_writer.cpp
index 97545d1..17e64a4 100644
--- a/be/src/exec/s3_writer.cpp
+++ b/be/src/exec/s3_writer.cpp
@@ -23,6 +23,7 @@
#include <aws/s3/model/PutObjectRequest.h>
#include "common/logging.h"
+#include "service/backend_options.h"
#include "util/s3_uri.h"
#include "util/s3_util.h"
@@ -66,7 +67,7 @@ Status S3Writer::open() {
} else {
std::stringstream out;
out << "Error: [" << response.GetError().GetExceptionName() << ":"
- << response.GetError().GetMessage();
+ << response.GetError().GetMessage() << "] at " <<
BackendOptions::get_localhost();
return Status::InternalError(out.str());
}
}
@@ -77,11 +78,13 @@ Status S3Writer::write(const uint8_t* buf, size_t buf_len,
size_t* written_len)
return Status::OK();
}
if (!_temp_file) {
- return Status::BufferAllocFailed("The internal temporary file is not
writable.");
+ return Status::BufferAllocFailed("The internal temporary file is not
writable. at " +
+ BackendOptions::get_localhost());
}
_temp_file->write(reinterpret_cast<const char*>(buf), buf_len);
if (!_temp_file->good()) {
- return Status::BufferAllocFailed("Could not append to the internal
temporary file.");
+ return Status::BufferAllocFailed("Could not append to the internal
temporary file. at " +
+ BackendOptions::get_localhost());
}
*written_len = buf_len;
return Status::OK();
@@ -97,7 +100,8 @@ Status S3Writer::close() {
Status S3Writer::_sync() {
if (!_temp_file) {
- return Status::BufferAllocFailed("The internal temporary file is not
writable.");
+ return Status::BufferAllocFailed("The internal temporary file is not
writable. at " +
+ BackendOptions::get_localhost());
}
CHECK_S3_CLIENT(_client);
Aws::S3::Model::PutObjectRequest request;
@@ -114,7 +118,7 @@ Status S3Writer::_sync() {
} else {
std::stringstream out;
out << "Error: [" << response.GetError().GetExceptionName() << ":"
- << response.GetError().GetMessage();
+ << response.GetError().GetMessage() << "] at " <<
BackendOptions::get_localhost();
return Status::InternalError(out.str());
}
}
diff --git a/be/src/olap/wrapper_field.h b/be/src/olap/wrapper_field.h
index 3d22a7b..51e4b3e 100644
--- a/be/src/olap/wrapper_field.h
+++ b/be/src/olap/wrapper_field.h
@@ -42,7 +42,7 @@ public:
delete _rep;
delete[] _owned_buf;
if (_long_text_buf) {
- delete _long_text_buf;
+ free(_long_text_buf);
}
}
diff --git a/be/src/runtime/routine_load/data_consumer.cpp
b/be/src/runtime/routine_load/data_consumer.cpp
index 7522e86..2cf330f 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -164,6 +164,7 @@ Status KafkaDataConsumer::assign_topic_partitions(
if (err) {
LOG(WARNING) << "failed to assign topic partitions: " <<
ctx->brief(true)
<< ", err: " << RdKafka::err2str(err);
+ _k_consumer->unassign();
return Status::InternalError("failed to assign topic partitions");
}
@@ -382,6 +383,7 @@ Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) {
Status KafkaDataConsumer::reset() {
std::unique_lock<std::mutex> l(_lock);
_cancelled = false;
+ _k_consumer->unassign();
// reset will be called before this consumer being returned to the pool.
// so update _last_visit_time is reasonable.
_last_visit_time = time(nullptr);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]