This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1f236a5 [BUG] Fix core when schema change (#5018)
1f236a5 is described below
commit 1f236a533900c63360d8f20df83e51b19d3cd519
Author: Zhengguo Yang <[email protected]>
AuthorDate: Fri Dec 4 09:53:19 2020 +0800
[BUG] Fix core when schema change (#5018)
---
be/src/olap/collect_iterator.cpp | 17 ++++++-------
be/src/olap/collect_iterator.h | 2 --
be/src/olap/reader.cpp | 53 +++++++++++++++++++++++++++++++---------
be/src/olap/reader.h | 8 +++---
4 files changed, 54 insertions(+), 26 deletions(-)
diff --git a/be/src/olap/collect_iterator.cpp b/be/src/olap/collect_iterator.cpp
index fba50f3..c3e0ebd 100644
--- a/be/src/olap/collect_iterator.cpp
+++ b/be/src/olap/collect_iterator.cpp
@@ -40,12 +40,11 @@ OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr
rs_reader) {
std::unique_ptr<LevelIterator> child(new Level0Iterator(rs_reader,
_reader));
RETURN_NOT_OK(child->init());
if (child->current_row() == nullptr) {
- return OLAP_SUCCESS;
+ return OLAP_ERR_DATA_EOF;
}
LevelIterator* child_ptr = child.release();
_children.push_back(child_ptr);
- _rs_readers.push_back(rs_reader);
return OLAP_SUCCESS;
}
@@ -53,28 +52,28 @@ OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr
rs_reader) {
// status will be used as the base rowset, and the other rowsets will be
merged first and
// then merged with the base rowset.
void CollectIterator::build_heap() {
- DCHECK(_rs_readers.size() == _children.size());
+ DCHECK(_reader->_rs_readers.size() == _children.size());
_reverse = _reader->_tablet->tablet_schema().keys_type() ==
KeysType::UNIQUE_KEYS;
if (_children.empty()) {
_inner_iter.reset(nullptr);
return;
} else if (_merge) {
- DCHECK(!_rs_readers.empty());
+ DCHECK(!_reader->_rs_readers.empty());
// build merge heap with two children, a base rowset as level0iterator
and
// other cumulative rowsets as a level1iterator
if (_children.size() > 1) {
// find base rowset(max rownum),
- RowsetReaderSharedPtr base_reader = _rs_readers[0];
+ RowsetReaderSharedPtr base_reader = _reader->_rs_readers[0];
int base_reader_idx = 0;
- for (size_t i = 1; i < _rs_readers.size(); ++i) {
- if (_rs_readers[i]->rowset()->rowset_meta()->num_rows() >
+ for (size_t i = 1; i < _reader->_rs_readers.size(); ++i) {
+ if
(_reader->_rs_readers[i]->rowset()->rowset_meta()->num_rows() >
base_reader->rowset()->rowset_meta()->num_rows()) {
- base_reader = _rs_readers[i];
+ base_reader = _reader->_rs_readers[i];
base_reader_idx = i;
}
}
std::vector<LevelIterator*> cumu_children;
- for (size_t i = 0; i < _rs_readers.size(); ++i) {
+ for (size_t i = 0; i < _reader->_rs_readers.size(); ++i) {
if (i != base_reader_idx) {
cumu_children.push_back(_children[i]);
}
diff --git a/be/src/olap/collect_iterator.h b/be/src/olap/collect_iterator.h
index 97a4834..173dba4 100644
--- a/be/src/olap/collect_iterator.h
+++ b/be/src/olap/collect_iterator.h
@@ -164,8 +164,6 @@ private:
// Hold reader point to access read params, such as fetch conditions.
Reader* _reader = nullptr;
- std::vector<RowsetReaderSharedPtr> _rs_readers;
-
};
} // namespace doris
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 43598e5..75f9789 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -90,7 +90,7 @@ std::string Reader::KeysParam::to_string() const {
return ss.str();
}
-Reader::Reader() {
+Reader::Reader() : _collect_iter(new CollectIterator()) {
_tracker.reset(new MemTracker(-1));
_predicate_mem_pool.reset(new MemPool(_tracker.get()));
}
@@ -119,14 +119,28 @@ OLAPStatus Reader::init(const ReaderParams& read_params) {
<< ", version:" << read_params.version;
return res;
}
-
- if (_rs_readers.size() == 1 &&
- !_rs_readers[0]->rowset()->rowset_meta()->is_segments_overlapping()) {
- _next_row_func = &Reader::_dup_key_next_row;
+ // When only one rowset has data, and this rowset is nonoverlapping, we
can read directly without aggregation
+ bool has_delete_rowset = false;
+ int nonoverlapping_count = 0;
+ for (auto rs_reader : _rs_readers) {
+ if (rs_reader->rowset()->rowset_meta()->delete_flag()) {
+ has_delete_rowset = true;
+ break;
+ }
+ if (rs_reader->rowset()->rowset_meta()->num_rows() > 0 &&
+ !rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) {
+ if (++nonoverlapping_count > 1) {
+ break;
+ }
+ }
+ }
+ if (nonoverlapping_count == 1 && !has_delete_rowset) {
+ _next_row_func = _tablet->keys_type() == AGG_KEYS ?
&Reader::_direct_agg_key_next_row
+ :
&Reader::_direct_next_row;
} else {
switch (_tablet->keys_type()) {
case KeysType::DUP_KEYS:
- _next_row_func = &Reader::_dup_key_next_row;
+ _next_row_func = &Reader::_direct_next_row;
break;
case KeysType::UNIQUE_KEYS:
_next_row_func = &Reader::_unique_key_next_row;
@@ -143,8 +157,8 @@ OLAPStatus Reader::init(const ReaderParams& read_params) {
return OLAP_SUCCESS;
}
-OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, MemPool* mem_pool,
ObjectPool* agg_pool,
- bool* eof) {
+OLAPStatus Reader::_direct_next_row(RowCursor* row_cursor, MemPool* mem_pool,
ObjectPool* agg_pool,
+ bool* eof) {
if (UNLIKELY(_next_key == nullptr)) {
*eof = true;
return OLAP_SUCCESS;
@@ -158,6 +172,22 @@ OLAPStatus Reader::_dup_key_next_row(RowCursor*
row_cursor, MemPool* mem_pool, O
}
return OLAP_SUCCESS;
}
+OLAPStatus Reader::_direct_agg_key_next_row(RowCursor* row_cursor, MemPool*
mem_pool,
+ ObjectPool* agg_pool, bool* eof) {
+ if (UNLIKELY(_next_key == nullptr)) {
+ *eof = true;
+ return OLAP_SUCCESS;
+ }
+ init_row_with_others(row_cursor, *_next_key, mem_pool, agg_pool);
+ auto res = _collect_iter->next(&_next_key, &_next_delete_flag);
+ if (res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF) {
+ return res;
+ }
+ if (_need_agg_finalize) {
+ agg_finalize_row(_value_cids, row_cursor, mem_pool);
+ }
+ return OLAP_SUCCESS;
+}
OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool,
ObjectPool* agg_pool,
bool* eof) {
@@ -254,8 +284,6 @@ void Reader::close() {
for (auto pred : _col_predicates) {
delete pred;
}
-
- delete _collect_iter;
}
OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
@@ -352,7 +380,9 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams&
read_params) {
LOG(WARNING) << "failed to add child to iterator";
return res;
}
- _rs_readers.push_back(rs_reader);
+ if (res == OLAP_SUCCESS) {
+ _rs_readers.push_back(rs_reader);
+ }
}
_collect_iter->build_heap();
_next_key = _collect_iter->current_row(&_next_delete_flag);
@@ -390,7 +420,6 @@ OLAPStatus Reader::_init_params(const ReaderParams&
read_params) {
_init_seek_columns();
- _collect_iter = new CollectIterator();
_collect_iter->init(this);
if (_tablet->tablet_schema().has_sequence_col()) {
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 0cd7142..9b2c911 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -145,8 +145,10 @@ private:
void _init_load_bf_columns(const ReaderParams& read_params);
- OLAPStatus _dup_key_next_row(RowCursor* row_cursor, MemPool* mem_pool,
ObjectPool* agg_pool,
- bool* eof);
+ OLAPStatus _direct_next_row(RowCursor* row_cursor, MemPool* mem_pool,
ObjectPool* agg_pool,
+ bool* eof);
+ OLAPStatus _direct_agg_key_next_row(RowCursor* row_cursor, MemPool*
mem_pool,
+ ObjectPool* agg_pool, bool* eof);
OLAPStatus _agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool,
ObjectPool* agg_pool,
bool* eof);
OLAPStatus _unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool,
ObjectPool* agg_pool,
@@ -183,7 +185,7 @@ private:
bool _has_sequence_col = false;
int32_t _sequence_col_idx = -1;
const RowCursor* _next_key = nullptr;
- CollectIterator* _collect_iter = nullptr;
+ std::unique_ptr<CollectIterator> _collect_iter;
std::vector<uint32_t> _key_cids;
std::vector<uint32_t> _value_cids;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]