This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new dbfe8e4 [enhancement] Optimize load CSV file memory allocate (#6174)
dbfe8e4 is described below
commit dbfe8e4753475ba1bfd490dfdae7003c62e32a86
Author: Zhengguo Yang <[email protected]>
AuthorDate: Mon Jul 12 09:58:45 2021 +0800
[enhancement] Optimize load CSV file memory allocate (#6174)
Optimize load CSV file memory allocate, avoid frequent allocation,
may reduce the load time by 40%-50% when large column numbers
---
be/src/exec/broker_scanner.cpp | 25 ++++++++++---------
be/src/exec/broker_scanner.h | 6 ++---
be/src/exec/tablet_info.h | 6 ++---
be/test/exec/multi_bytes_separator_test.cpp | 38 +++++++++++++----------------
4 files changed, 36 insertions(+), 39 deletions(-)
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index 6287705..b275647 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -40,7 +40,7 @@
#include "util/utf8_check.h"
#if defined(__x86_64__)
- #include "exec/hdfs_file_reader.h"
+#include "exec/hdfs_file_reader.h"
#endif
namespace doris {
@@ -75,6 +75,7 @@ BrokerScanner::BrokerScanner(RuntimeState* state,
RuntimeProfile* profile,
_line_delimiter.push_back(static_cast<char>(params.line_delimiter));
_line_delimiter_length = 1;
}
+ _split_values.reserve(sizeof(Slice) * params.src_slot_ids.size());
}
BrokerScanner::~BrokerScanner() {
@@ -323,7 +324,8 @@ void BrokerScanner::close() {
}
}
-void BrokerScanner::split_line(const Slice& line, std::vector<Slice>* values) {
+void BrokerScanner::split_line(const Slice& line) {
+ _split_values.clear();
const char* value = line.data;
size_t start = 0; // point to the start pos of next col value.
size_t curpos = 0; // point to the start pos of separator matching
sequence.
@@ -348,7 +350,7 @@ void BrokerScanner::split_line(const Slice& line,
std::vector<Slice>* values) {
p1++;
if (p1 == _value_separator_length) {
// Match a separator
- values->emplace_back(value + start, curpos - start);
+ _split_values.emplace_back(value + start, curpos - start);
start = curpos + _value_separator_length;
curpos = start;
p1 = 0;
@@ -357,7 +359,7 @@ void BrokerScanner::split_line(const Slice& line,
std::vector<Slice>* values) {
}
CHECK(curpos == line.size) << curpos << " vs " << line.size;
- values->emplace_back(value + start, curpos - start);
+ _split_values.emplace_back(value + start, curpos - start);
}
void BrokerScanner::fill_fix_length_string(const Slice& value, MemPool* pool,
char** new_value_p,
@@ -460,26 +462,25 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
return false;
}
- std::vector<Slice> values;
- { split_line(line, &values); }
+ split_line(line);
// range of current file
const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
const std::vector<std::string>& columns_from_path =
range.columns_from_path;
- if (values.size() + columns_from_path.size() < _src_slot_descs.size()) {
+ if (_split_values.size() + columns_from_path.size() <
_src_slot_descs.size()) {
std::stringstream error_msg;
error_msg << "actual column number is less than schema column number. "
- << "actual number: " << values.size() << " column separator:
["
+ << "actual number: " << _split_values.size() << " column
separator: ["
<< _value_separator << "], "
<< "line delimiter: [" << _line_delimiter << "], "
<< "schema number: " << _src_slot_descs.size() << "; ";
_state->append_error_msg_to_file(std::string(line.data, line.size),
error_msg.str());
_counter->num_rows_filtered++;
return false;
- } else if (values.size() + columns_from_path.size() >
_src_slot_descs.size()) {
+ } else if (_split_values.size() + columns_from_path.size() >
_src_slot_descs.size()) {
std::stringstream error_msg;
error_msg << "actual column number is more than schema column number. "
- << "actual number: " << values.size() << " column separator:
["
+ << "actual number: " << _split_values.size() << " column
separator: ["
<< _value_separator << "], "
<< "line delimiter: [" << _line_delimiter << "], "
<< "schema number: " << _src_slot_descs.size() << "; ";
@@ -488,9 +489,9 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) {
return false;
}
- for (int i = 0; i < values.size(); ++i) {
+ for (int i = 0; i < _split_values.size(); ++i) {
auto slot_desc = _src_slot_descs[i];
- const Slice& value = values[i];
+ const Slice& value = _split_values[i];
if (slot_desc->is_nullable() && is_null(value)) {
_src_tuple->set_null(slot_desc->null_indicator_offset());
continue;
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index da41f47..4578a77 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -55,8 +55,7 @@ public:
BrokerScanner(RuntimeState* state, RuntimeProfile* profile,
const TBrokerScanRangeParams& params, const
std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
- const std::vector<ExprContext*>&
pre_filter_ctxs,
- ScannerCounter* counter);
+ const std::vector<ExprContext*>& pre_filter_ctxs,
ScannerCounter* counter);
~BrokerScanner();
// Open this scanner, will initialize information need to
@@ -76,7 +75,7 @@ private:
Status open_next_reader();
// Split one text line to values
- void split_line(const Slice& line, std::vector<Slice>* values);
+ void split_line(const Slice& line);
void fill_fix_length_string(const Slice& value, MemPool* pool, char**
new_value_p,
int new_value_length);
@@ -118,6 +117,7 @@ private:
// used to hold current StreamLoadPipe
std::shared_ptr<StreamLoadPipe> _stream_load_pipe;
+ std::vector<Slice> _split_values;
};
} // namespace doris
diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index 59c2962..f88caec 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -137,7 +137,7 @@ public:
}
private:
- std::vector<SlotDescriptor*> _slot_descs;
+ const std::vector<SlotDescriptor*>& _slot_descs;
};
// store an olap table's tablet information
@@ -175,7 +175,7 @@ private:
}
OlapTablePartKeyComparator comparator(_partition_slot_descs);
const TOlapTablePartition& t_part = _t_param.partitions[0];
- // when list partition, return true if equals.
+ // when list partition, return true if equals.
if (t_part.__isset.in_keys) {
bool ret = false;
for (auto in_key : part->in_keys) {
@@ -185,7 +185,7 @@ private:
}
}
return ret;
- }
+ }
return !comparator(key, part->start_key);
}
diff --git a/be/test/exec/multi_bytes_separator_test.cpp
b/be/test/exec/multi_bytes_separator_test.cpp
index c3fea0e..082e58f 100644
--- a/be/test/exec/multi_bytes_separator_test.cpp
+++ b/be/test/exec/multi_bytes_separator_test.cpp
@@ -54,7 +54,7 @@ TEST_F(MultiBytesSeparatorTest, normal) {
params.line_delimiter_str = "BBB";
params.column_separator_length = 4;
params.line_delimiter_length = 3;
-
+
const std::vector<TBrokerRangeDesc> ranges;
const std::vector<TNetworkAddress> broker_addresses;
const std::vector<ExprContext*> pre_filter_ctxs;
@@ -66,31 +66,28 @@ TEST_F(MultiBytesSeparatorTest, normal) {
{
std::string line = "AAAA";
Slice s(line);
- std::vector<Slice> values;
- scanner.split_line(s, &values);
- ASSERT_EQ(2, values.size());
- ASSERT_EQ(0, values[0].size);
- ASSERT_EQ(0, values[1].size);
+ scanner.split_line(s);
+ ASSERT_EQ(2, scanner._split_values.size());
+ ASSERT_EQ(0, scanner._split_values[0].size);
+ ASSERT_EQ(0, scanner._split_values[1].size);
}
// 2.
{
std::string line = "ABAA";
Slice s(line);
- std::vector<Slice> values;
- scanner.split_line(s, &values);
- ASSERT_EQ(1, values.size());
- ASSERT_EQ(4, values[0].size);
+ scanner.split_line(s);
+ ASSERT_EQ(1, scanner._split_values.size());
+ ASSERT_EQ(4, scanner._split_values[0].size);
}
// 3.
{
std::string line = "";
Slice s(line);
- std::vector<Slice> values;
- scanner.split_line(s, &values);
- ASSERT_EQ(1, values.size());
- ASSERT_EQ(0, values[0].size);
+ scanner.split_line(s);
+ ASSERT_EQ(1, scanner._split_values.size());
+ ASSERT_EQ(0, scanner._split_values[0].size);
}
// 4.
@@ -98,13 +95,12 @@ TEST_F(MultiBytesSeparatorTest, normal) {
// 1234, AAAB, , AA
std::string line = "1234AAAAAAABAAAAAAAAAA";
Slice s(line);
- std::vector<Slice> values;
- scanner.split_line(s, &values);
- ASSERT_EQ(4, values.size());
- ASSERT_EQ(4, values[0].size);
- ASSERT_EQ(4, values[1].size);
- ASSERT_EQ(0, values[2].size);
- ASSERT_EQ(2, values[3].size);
+ scanner.split_line(s);
+ ASSERT_EQ(4, scanner._split_values.size());
+ ASSERT_EQ(4, scanner._split_values[0].size);
+ ASSERT_EQ(4, scanner._split_values[1].size);
+ ASSERT_EQ(0, scanner._split_values[2].size);
+ ASSERT_EQ(2, scanner._split_values[3].size);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]