This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5fed34f [optimize] provide a better defer operator (#5706)
5fed34f is described below
commit 5fed34fcfe296e312653a3a30afffc5a13b9e854
Author: stdpain <[email protected]>
AuthorDate: Wed May 12 10:37:23 2021 +0800
[optimize] provide a better defer operator (#5706)
---
be/src/olap/schema_change.cpp | 67 +++++++++++++---------
be/src/runtime/routine_load/data_consumer.cpp | 19 +++---
.../routine_load/routine_load_task_executor.cpp | 5 +-
be/src/runtime/sorted_run_merger.cc | 2 +-
be/src/util/defer_op.h | 19 ++++--
be/src/util/file_utils.cpp | 8 +--
be/src/util/mysql_load_error_hub.cpp | 2 +-
7 files changed, 68 insertions(+), 54 deletions(-)
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 43d3a2a..e738897 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -74,8 +74,8 @@ public:
explicit RowBlockMerger(TabletSharedPtr tablet);
virtual ~RowBlockMerger();
- bool merge(const std::vector<RowBlock*>& row_block_arr, RowsetWriter*
rowset_writer, std::shared_ptr<MemTracker> parent,
- uint64_t* merged_rows);
+ bool merge(const std::vector<RowBlock*>& row_block_arr, RowsetWriter*
rowset_writer,
+ std::shared_ptr<MemTracker> parent, uint64_t* merged_rows);
private:
struct MergeElement {
@@ -713,7 +713,8 @@ bool RowBlockSorter::sort(RowBlock** row_block) {
return true;
}
-RowBlockAllocator::RowBlockAllocator(const TabletSchema& tablet_schema,
std::shared_ptr<MemTracker> parent, size_t memory_limitation)
+RowBlockAllocator::RowBlockAllocator(const TabletSchema& tablet_schema,
+ std::shared_ptr<MemTracker> parent,
size_t memory_limitation)
: _tablet_schema(tablet_schema),
_mem_tracker(MemTracker::CreateTracker(-1, "RowBlockAllocator",
parent, false)),
_row_len(tablet_schema.row_size()),
@@ -723,16 +724,18 @@ RowBlockAllocator::RowBlockAllocator(const TabletSchema&
tablet_schema, std::sha
RowBlockAllocator::~RowBlockAllocator() {
if (_mem_tracker->consumption() != 0) {
- LOG(WARNING) << "memory lost in RowBlockAllocator. memory_size=" <<
_mem_tracker->consumption();
+ LOG(WARNING) << "memory lost in RowBlockAllocator. memory_size="
+ << _mem_tracker->consumption();
}
}
OLAPStatus RowBlockAllocator::allocate(RowBlock** row_block, size_t num_rows,
bool null_supported) {
size_t row_block_size = _row_len * num_rows;
- if (_memory_limitation > 0 && _mem_tracker->consumption() + row_block_size
> _memory_limitation) {
+ if (_memory_limitation > 0 &&
+ _mem_tracker->consumption() + row_block_size > _memory_limitation) {
LOG(WARNING) << "RowBlockAllocator::alocate() memory exceeded. "
- << "m_memory_allocated=" << _mem_tracker->consumption();
+ << "m_memory_allocated=" << _mem_tracker->consumption();
*row_block = nullptr;
return OLAP_SUCCESS;
}
@@ -751,7 +754,8 @@ OLAPStatus RowBlockAllocator::allocate(RowBlock**
row_block, size_t num_rows, bo
_mem_tracker->Consume(row_block_size);
VLOG_NOTICE << "RowBlockAllocator::allocate() this=" << this << ",
num_rows=" << num_rows
- << ", m_memory_allocated=" << _mem_tracker->consumption() << ",
row_block_addr=" << *row_block;
+ << ", m_memory_allocated=" << _mem_tracker->consumption()
+ << ", row_block_addr=" << *row_block;
return OLAP_SUCCESS;
}
@@ -765,7 +769,8 @@ void RowBlockAllocator::release(RowBlock* row_block) {
VLOG_NOTICE << "RowBlockAllocator::release() this=" << this
<< ", num_rows=" << row_block->capacity()
- << ", m_memory_allocated=" << _mem_tracker->consumption() <<
", row_block_addr=" << row_block;
+ << ", m_memory_allocated=" << _mem_tracker->consumption()
+ << ", row_block_addr=" << row_block;
delete row_block;
}
@@ -773,11 +778,12 @@ RowBlockMerger::RowBlockMerger(TabletSharedPtr tablet) :
_tablet(tablet) {}
RowBlockMerger::~RowBlockMerger() {}
-bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr,
RowsetWriter* rowset_writer, std::shared_ptr<MemTracker> parent,
- uint64_t* merged_rows) {
+bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr,
RowsetWriter* rowset_writer,
+ std::shared_ptr<MemTracker> parent, uint64_t*
merged_rows) {
uint64_t tmp_merged_rows = 0;
RowCursor row_cursor;
- std::shared_ptr<MemTracker> tracker(MemTracker::CreateTracker(-1,
"RowBlockMerger", parent, false));
+ std::shared_ptr<MemTracker> tracker(
+ MemTracker::CreateTracker(-1, "RowBlockMerger", parent, false));
std::unique_ptr<MemPool> mem_pool(new MemPool(tracker.get()));
std::unique_ptr<ObjectPool> agg_object_pool(new ObjectPool());
if (row_cursor.init(_tablet->tablet_schema()) != OLAP_SUCCESS) {
@@ -898,8 +904,12 @@ OLAPStatus
LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader,
return status;
}
-SchemaChangeDirectly::SchemaChangeDirectly(const RowBlockChanger&
row_block_changer, std::shared_ptr<MemTracker> mem_tracker)
- : SchemaChange(mem_tracker), _row_block_changer(row_block_changer),
_row_block_allocator(nullptr), _cursor(nullptr) {}
+SchemaChangeDirectly::SchemaChangeDirectly(const RowBlockChanger&
row_block_changer,
+ std::shared_ptr<MemTracker>
mem_tracker)
+ : SchemaChange(mem_tracker),
+ _row_block_changer(row_block_changer),
+ _row_block_allocator(nullptr),
+ _cursor(nullptr) {}
SchemaChangeDirectly::~SchemaChangeDirectly() {
VLOG_NOTICE << "~SchemaChangeDirectly()";
@@ -920,7 +930,7 @@ bool SchemaChangeDirectly::_write_row_block(RowsetWriter*
rowset_writer, RowBloc
}
OLAPStatus reserve_block(std::unique_ptr<RowBlock, RowBlockDeleter>*
block_handle_ptr, int row_num,
- RowBlockAllocator* allocator) {
+ RowBlockAllocator* allocator) {
auto& block_handle = *block_handle_ptr;
if (block_handle == nullptr || block_handle->capacity() < row_num) {
// release old block and alloc new block
@@ -1010,7 +1020,7 @@ OLAPStatus
SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
res = _row_block_changer.change_row_block(ref_row_block,
rowset_reader->version().second,
new_row_block.get(),
&filtered_rows);
RETURN_NOT_OK_LOG(res, "failed to change data in row block.");
-
+
// rows filtered by delete handler one by one
add_filtered_rows(filtered_rows);
@@ -1053,7 +1063,8 @@ OLAPStatus
SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
return res;
}
-SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger&
row_block_changer, std::shared_ptr<MemTracker> mem_tracker,
+SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger&
row_block_changer,
+ std::shared_ptr<MemTracker>
mem_tracker,
size_t memory_limitation)
: SchemaChange(mem_tracker),
_row_block_changer(row_block_changer),
@@ -1078,8 +1089,8 @@ OLAPStatus
SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
TabletSharedPtr new_tablet,
TabletSharedPtr base_tablet) {
if (_row_block_allocator == nullptr) {
- _row_block_allocator =
- new (nothrow) RowBlockAllocator(new_tablet->tablet_schema(),
_mem_tracker, _memory_limitation);
+ _row_block_allocator = new (nothrow)
+ RowBlockAllocator(new_tablet->tablet_schema(), _mem_tracker,
_memory_limitation);
if (_row_block_allocator == nullptr) {
LOG(FATAL) << "failed to malloc RowBlockAllocator. size=" <<
sizeof(RowBlockAllocator);
return OLAP_ERR_INPUT_PARAMETER_ERROR;
@@ -1114,7 +1125,7 @@ OLAPStatus
SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
// src_rowsets to store the rowset generated by internal sorting
std::vector<RowsetSharedPtr> src_rowsets;
- DeferOp defer([&]() {
+ Defer defer{[&]() {
// remove the intermediate rowsets generated by internal sorting
for (auto& row_set : src_rowsets) {
StorageEngine::instance()->add_unused_rowset(row_set);
@@ -1125,7 +1136,7 @@ OLAPStatus
SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
}
row_block_arr.clear();
- });
+ }};
_temp_delta_versions.first = _temp_delta_versions.second;
@@ -1349,10 +1360,10 @@ bool
SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row
return true;
}
-SchemaChangeHandler::SchemaChangeHandler() :
_mem_tracker(MemTracker::CreateTracker(-1, "SchemaChange")) {
- REGISTER_HOOK_METRIC(schema_change_mem_consumption, [this]() {
- return _mem_tracker->consumption();
- });
+SchemaChangeHandler::SchemaChangeHandler()
+ : _mem_tracker(MemTracker::CreateTracker(-1, "SchemaChange")) {
+ REGISTER_HOOK_METRIC(schema_change_mem_consumption,
+ [this]() { return _mem_tracker->consumption(); });
}
SchemaChangeHandler::~SchemaChangeHandler() {
@@ -1644,8 +1655,8 @@ OLAPStatus
SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl
size_t memory_limitation =
config::memory_limitation_per_thread_for_schema_change;
LOG(INFO) << "doing schema change with sorting for base_tablet "
<< base_tablet->full_name();
- sc_procedure = new (nothrow)
- SchemaChangeWithSorting(rb_changer, _mem_tracker,
memory_limitation * 1024 * 1024 * 1024);
+ sc_procedure = new (nothrow) SchemaChangeWithSorting(
+ rb_changer, _mem_tracker, memory_limitation * 1024 * 1024 *
1024);
} else if (sc_directly) {
LOG(INFO) << "doing schema change directly for base_tablet " <<
base_tablet->full_name();
sc_procedure = new (nothrow) SchemaChangeDirectly(rb_changer,
_mem_tracker);
@@ -1852,8 +1863,8 @@ OLAPStatus
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangePa
size_t memory_limitation =
config::memory_limitation_per_thread_for_schema_change;
LOG(INFO) << "doing schema change with sorting for base_tablet "
<< sc_params.base_tablet->full_name();
- sc_procedure = new (nothrow)
- SchemaChangeWithSorting(rb_changer, _mem_tracker,
memory_limitation * 1024 * 1024 * 1024);
+ sc_procedure = new (nothrow) SchemaChangeWithSorting(
+ rb_changer, _mem_tracker, memory_limitation * 1024 * 1024 *
1024);
} else if (sc_directly) {
LOG(INFO) << "doing schema change directly for base_tablet "
<< sc_params.base_tablet->full_name();
diff --git a/be/src/runtime/routine_load/data_consumer.cpp
b/be/src/runtime/routine_load/data_consumer.cpp
index 157f69a..301ad24 100644
--- a/be/src/runtime/routine_load/data_consumer.cpp
+++ b/be/src/runtime/routine_load/data_consumer.cpp
@@ -44,8 +44,7 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
// conf has to be deleted finally
- auto conf_deleter = [conf]() { delete conf; };
- DeferOp delete_conf(std::bind<void>(conf_deleter));
+ Defer delete_conf{[conf]() { delete conf; }};
std::stringstream ss;
ss << BackendOptions::get_localhost() << "_";
@@ -146,11 +145,10 @@ Status KafkaDataConsumer::assign_topic_partitions(
<< " assign topic partitions: " << topic << ", " << ss.str();
// delete TopicPartition finally
- auto tp_deleter = [&topic_partitions]() {
+ Defer delete_tp{[&topic_partitions]() {
std::for_each(topic_partitions.begin(), topic_partitions.end(),
[](RdKafka::TopicPartition* tp1) { delete tp1; });
- };
- DeferOp delete_tp(std::bind<void>(tp_deleter));
+ }};
// assign partition
RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions);
@@ -238,8 +236,7 @@ Status
KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>*
partition_ids) {
// create topic conf
RdKafka::Conf* tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
- auto conf_deleter = [tconf]() { delete tconf; };
- DeferOp delete_conf(std::bind<void>(conf_deleter));
+ Defer delete_conf{[tconf]() { delete tconf; }};
// create topic
std::string errstr;
@@ -250,8 +247,8 @@ Status
KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
- auto topic_deleter = [topic]() { delete topic; };
- DeferOp delete_topic(std::bind<void>(topic_deleter));
+
+ Defer delete_topic{[topic]() { delete topic; }};
// get topic metadata
RdKafka::Metadata* metadata = nullptr;
@@ -263,8 +260,8 @@ Status
KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
- auto meta_deleter = [metadata]() { delete metadata; };
- DeferOp delete_meta(std::bind<void>(meta_deleter));
+
+ Defer delete_meta{[metadata]() { delete metadata; }};
// get partition ids
RdKafka::Metadata::TopicMetadataIterator it;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 6d40c9d..ed0abc9 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -293,11 +293,10 @@ void
RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool
_data_consumer_pool.return_consumer(consumer);
// delete TopicPartition finally
- auto tp_deleter = [&topic_partitions]() {
+ Defer delete_tp{[&topic_partitions]() {
std::for_each(topic_partitions.begin(), topic_partitions.end(),
[](RdKafka::TopicPartition* tp1) { delete tp1; });
- };
- DeferOp delete_tp(std::bind<void>(tp_deleter));
+ }};
} break;
default:
return;
diff --git a/be/src/runtime/sorted_run_merger.cc
b/be/src/runtime/sorted_run_merger.cc
index 8b6f283..de5805c 100644
--- a/be/src/runtime/sorted_run_merger.cc
+++ b/be/src/runtime/sorted_run_merger.cc
@@ -188,7 +188,7 @@ private:
// do merge from sender queue data
_status_backup = _sorted_run(&_input_row_batch_backup);
_backup_ready = true;
- DeferOp defer_op([this]() { _batch_prepared_cv.notify_one(); });
+ Defer defer_op{[this]() { _batch_prepared_cv.notify_one(); }};
if (!_status_backup.ok() || _input_row_batch_backup == nullptr ||
_cancel) {
if (!_status_backup.ok()) _input_row_batch_backup = nullptr;
diff --git a/be/src/util/defer_op.h b/be/src/util/defer_op.h
index 467ca9a..544e151 100644
--- a/be/src/util/defer_op.h
+++ b/be/src/util/defer_op.h
@@ -23,15 +23,22 @@
namespace doris {
// This class is used to defer a function when this object is deconstruct
-class DeferOp {
+// A Better Defer operator #5576
+// for C++17
+// Defer defer {[]{ call something }};
+//
+// for C++11
+// auto op = [] {};
+// Defer<decltype<op>> (op);
+template <class T>
+class Defer {
public:
- typedef std::function<void()> DeferFunction;
- DeferOp(const DeferFunction& func) : _func(func) {}
-
- ~DeferOp() { _func(); };
+ Defer(T& closure) : _closure(closure) {}
+ Defer(T&& closure) : _closure(std::move(closure)) {}
+ ~Defer() { _closure(); }
private:
- DeferFunction _func;
+ T _closure;
};
} // namespace doris
diff --git a/be/src/util/file_utils.cpp b/be/src/util/file_utils.cpp
index 8825226..fd934b5 100644
--- a/be/src/util/file_utils.cpp
+++ b/be/src/util/file_utils.cpp
@@ -26,6 +26,7 @@
#include <algorithm>
#include <filesystem>
#include <iomanip>
+#include <memory>
#include <sstream>
#include "env/env.h"
@@ -240,16 +241,15 @@ Status FileUtils::copy_file(const std::string& src_path,
const std::string& dest
}
const int64_t BUF_SIZE = 8192;
- char* buf = new char[BUF_SIZE];
- DeferOp free_buf(std::bind<void>(std::default_delete<char[]>(), buf));
+ std::unique_ptr<char[]> buf = std::make_unique<char[]>(BUF_SIZE);
int64_t src_length = src_file.length();
int64_t offset = 0;
while (src_length > 0) {
int64_t to_read = BUF_SIZE < src_length ? BUF_SIZE : src_length;
- if (OLAP_SUCCESS != (src_file.pread(buf, to_read, offset))) {
+ if (OLAP_SUCCESS != (src_file.pread(buf.get(), to_read, offset))) {
return Status::InternalError("Internal Error");
}
- if (OLAP_SUCCESS != (dest_file.pwrite(buf, to_read, offset))) {
+ if (OLAP_SUCCESS != (dest_file.pwrite(buf.get(), to_read, offset))) {
return Status::InternalError("Internal Error");
}
diff --git a/be/src/util/mysql_load_error_hub.cpp
b/be/src/util/mysql_load_error_hub.cpp
index 17a387c..920ceea 100644
--- a/be/src/util/mysql_load_error_hub.cpp
+++ b/be/src/util/mysql_load_error_hub.cpp
@@ -70,7 +70,7 @@ Status MysqlLoadErrorHub::write_mysql() {
return st;
}
- DeferOp close_mysql_conn(std::bind<void>(&mysql_close, my_conn));
+ Defer close_mysql_conn{[=]() { mysql_close(my_conn); }};
Status status;
std::stringstream sql_stream;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]