This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2306e46658 [Enhancement](compaction) reduce VMergeIterator copy block
(#12316)
2306e46658 is described below
commit 2306e46658681a1d294975d810c82c1e209d016b
Author: Pxl <[email protected]>
AuthorDate: Tue Sep 13 16:19:34 2022 +0800
[Enhancement](compaction) reduce VMergeIterator copy block (#12316)
This pr change make VMergeIterator support return row reference to instead
copy a full block.
---
be/src/olap/compaction.cpp | 4 +-
be/src/olap/iterators.h | 10 +-
be/src/olap/rowset/beta_rowset_reader.cpp | 26 ++-
be/src/olap/rowset/beta_rowset_reader.h | 2 +
be/src/olap/rowset/rowset_reader.h | 5 +-
be/src/vec/core/block.h | 19 +++
be/src/vec/olap/block_reader.h | 2 +-
be/src/vec/olap/vcollect_iterator.cpp | 90 ++++++-----
be/src/vec/olap/vcollect_iterator.h | 64 ++++++--
be/src/vec/olap/vgeneric_iterators.cpp | 259 +++++++++++++++++-------------
10 files changed, 306 insertions(+), 175 deletions(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 8f9e8910d9..2e9b4a6b55 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -236,7 +236,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
<< ". elapsed time=" << watch.get_elapse_second()
<< "s. cumulative_compaction_policy="
<< _tablet->cumulative_compaction_policy()->name()
- << ", compact_row_per_second=" << _input_row_num /
watch.get_elapse_second();
+ << ", compact_row_per_second=" << int(_input_row_num /
watch.get_elapse_second());
return Status::OK();
}
@@ -336,7 +336,7 @@ Status Compaction::check_correctness(const
Merger::Statistics& stats) {
// 1. check row number
if (_input_row_num != _output_rowset->num_rows() + stats.merged_rows +
stats.filtered_rows) {
LOG(WARNING) << "row_num does not match between cumulative input and
output! "
- << "input_row_num=" << _input_row_num
+ << "tablet=" << _tablet->full_name() << ",
input_row_num=" << _input_row_num
<< ", merged_row_num=" << stats.merged_rows
<< ", filtered_row_num=" << stats.filtered_rows
<< ", output_row_num=" << _output_rowset->num_rows();
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 3d9690f70b..22f081d0eb 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -94,8 +94,8 @@ public:
// Used to read data in RowBlockV2 one by one
class RowwiseIterator {
public:
- RowwiseIterator() {}
- virtual ~RowwiseIterator() {}
+ RowwiseIterator() = default;
+ virtual ~RowwiseIterator() = default;
// Initialize this iterator and make it ready to read with
// input options.
@@ -116,6 +116,12 @@ public:
return Status::NotSupported("to be implemented");
}
+ virtual Status next_block_view(vectorized::BlockView* block_view) {
+ return Status::NotSupported("to be implemented");
+ }
+
+ virtual bool support_return_data_by_ref() { return false; }
+
virtual Status current_block_row_locations(std::vector<RowLocation>*
block_row_locations) {
return Status::NotSupported("to be implemented");
}
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 17223c9eb6..df15b72f62 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -24,7 +24,6 @@
#include "olap/row_block.h"
#include "olap/row_block2.h"
#include "olap/row_cursor.h"
-#include "olap/rowset/segment_v2/segment_iterator.h"
#include "olap/schema.h"
#include "olap/tablet_meta.h"
#include "vec/core/block.h"
@@ -119,7 +118,9 @@ Status BetaRowsetReader::init(RowsetReaderContext*
read_context) {
for (uint32_t seg_id = 0; seg_id < rowset()->num_segments(); ++seg_id)
{
auto d = read_context->delete_bitmap->get_agg(
{rowset_id, seg_id, read_context->version.second});
- if (d->isEmpty()) continue; // Empty delete bitmap for the segment
+ if (d->isEmpty()) {
+ continue; // Empty delete bitmap for the segment
+ }
VLOG_TRACE << "Get the delete bitmap for rowset: " <<
rowset_id.to_string()
<< ", segment id:" << seg_id << ", size:" <<
d->cardinality();
read_options.delete_bitmap.emplace(seg_id, std::move(d));
@@ -323,6 +324,27 @@ Status BetaRowsetReader::next_block(vectorized::Block*
block) {
return Status::OK();
}
+Status BetaRowsetReader::next_block_view(vectorized::BlockView* block_view) {
+ SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
+ if (config::enable_storage_vectorization && _context->is_vec) {
+ do {
+ auto s = _iterator->next_block_view(block_view);
+ if (!s.ok()) {
+ if (s.is_end_of_file()) {
+ return Status::OLAPInternalError(OLAP_ERR_DATA_EOF);
+ } else {
+ LOG(WARNING) << "failed to read next block: " <<
s.to_string();
+ return
Status::OLAPInternalError(OLAP_ERR_ROWSET_READ_FAILED);
+ }
+ }
+ } while (block_view->empty());
+ } else {
+ return Status::NotSupported("block view only support
enable_storage_vectorization");
+ }
+
+ return Status::OK();
+}
+
bool BetaRowsetReader::_should_push_down_value_predicates() const {
// if unique table with rowset [0-x] or [0-1] [2-y] [...],
// value column predicates can be pushdown on rowset [0-x] or [2-y], [2-y]
must be compaction and not overlapping
diff --git a/be/src/olap/rowset/beta_rowset_reader.h
b/be/src/olap/rowset/beta_rowset_reader.h
index b987efc9ad..5424722c16 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -38,6 +38,8 @@ public:
// It's ok, because we only get ref here, the block's owner is this reader.
Status next_block(RowBlock** block) override;
Status next_block(vectorized::Block* block) override;
+ Status next_block_view(vectorized::BlockView* block_view) override;
+ bool support_return_data_by_ref() override { return
_iterator->support_return_data_by_ref(); }
bool delete_flag() override { return _rowset->delete_flag(); }
diff --git a/be/src/olap/rowset/rowset_reader.h
b/be/src/olap/rowset/rowset_reader.h
index 75f780b953..eecf594254 100644
--- a/be/src/olap/rowset/rowset_reader.h
+++ b/be/src/olap/rowset/rowset_reader.h
@@ -38,7 +38,7 @@ using RowsetReaderSharedPtr = std::shared_ptr<RowsetReader>;
class RowsetReader {
public:
- virtual ~RowsetReader() {}
+ virtual ~RowsetReader() = default;
// reader init
virtual Status init(RowsetReaderContext* read_context) = 0;
@@ -52,6 +52,9 @@ public:
virtual Status next_block(vectorized::Block* block) = 0;
+ virtual Status next_block_view(vectorized::BlockView* block_view) = 0;
+ virtual bool support_return_data_by_ref() { return false; }
+
virtual bool delete_flag() = 0;
virtual Version version() = 0;
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index eb229cd173..aa603a1800 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -504,5 +504,24 @@ public:
}
};
+struct IteratorRowRef {
+ std::shared_ptr<Block> block;
+ int row_pos;
+ bool is_same;
+
+ template <typename T>
+ int compare(const IteratorRowRef& rhs, const T& compare_arguments) const {
+ return block->compare_at(row_pos, rhs.row_pos, compare_arguments,
*rhs.block, -1);
+ }
+
+ void reset() {
+ block = nullptr;
+ row_pos = -1;
+ is_same = false;
+ }
+};
+
+using BlockView = std::vector<IteratorRowRef>;
+
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h
index 0a4a8f807c..682caed065 100644
--- a/be/src/vec/olap/block_reader.h
+++ b/be/src/vec/olap/block_reader.h
@@ -99,7 +99,7 @@ private:
std::vector<bool> _stored_has_null_tag;
std::vector<bool> _stored_has_string_tag;
- phmap::flat_hash_map<const Block*, std::vector<std::pair<int16_t,
int16_t>>> _temp_ref_map;
+ phmap::flat_hash_map<const Block*, std::vector<std::pair<int, int>>>
_temp_ref_map;
bool _eof = false;
diff --git a/be/src/vec/olap/vcollect_iterator.cpp
b/be/src/vec/olap/vcollect_iterator.cpp
index 159e59be6b..7d4560f107 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -17,15 +17,12 @@
#include "vec/olap/vcollect_iterator.h"
-#include <memory>
-
-#include "olap/rowset/beta_rowset_reader.h"
+#include "common/status.h"
+#include "util/defer_op.h"
namespace doris {
namespace vectorized {
-VCollectIterator::~VCollectIterator() {}
-
#define RETURN_IF_NOT_EOF_AND_OK(stmt)
\
do {
\
const Status& _status_ = (stmt);
\
@@ -68,9 +65,10 @@ Status
VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_reade
return Status::OK();
} else if (_merge) {
DCHECK(!rs_readers.empty());
+ bool have_multiple_child = false;
for (auto [c_iter, r_iter] = std::pair {_children.begin(),
rs_readers.begin()};
c_iter != _children.end();) {
- auto s = (*c_iter)->init();
+ auto s = (*c_iter)->init(have_multiple_child);
if (!s.ok()) {
delete (*c_iter);
c_iter = _children.erase(c_iter);
@@ -79,6 +77,7 @@ Status
VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_reade
return s;
}
} else {
+ have_multiple_child = true;
++c_iter;
++r_iter;
}
@@ -135,11 +134,8 @@ bool
VCollectIterator::LevelIteratorComparator::operator()(LevelIterator* lhs, L
const IteratorRowRef& rhs_ref = *rhs->current_row_ref();
int cmp_res = UNLIKELY(lhs->compare_columns())
- ? lhs_ref.block->compare_at(lhs_ref.row_pos,
rhs_ref.row_pos,
- lhs->compare_columns(),
*rhs_ref.block, -1)
- : lhs_ref.block->compare_at(lhs_ref.row_pos,
rhs_ref.row_pos,
-
lhs->tablet_schema().num_key_columns(),
- *rhs_ref.block, -1);
+ ? lhs_ref.compare(rhs_ref, lhs->compare_columns())
+ : lhs_ref.compare(rhs_ref,
lhs->tablet_schema().num_key_columns());
if (cmp_res != 0) {
return UNLIKELY(_is_reverse) ? cmp_res < 0 : cmp_res > 0;
}
@@ -192,15 +188,22 @@
VCollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader
TabletReader* reader)
: LevelIterator(reader), _rs_reader(rs_reader), _reader(reader) {
DCHECK_EQ(RowsetTypePB::BETA_ROWSET, rs_reader->type());
- _block = std::make_shared<Block>(_schema.create_block(
- _reader->_return_columns,
_reader->_tablet_columns_convert_to_null_set));
- _ref.block = _block;
- _ref.row_pos = 0;
- _ref.is_same = false;
}
-Status VCollectIterator::Level0Iterator::init() {
- return _refresh_current_row();
+Status VCollectIterator::Level0Iterator::init(bool get_data_by_ref) {
+ _get_data_by_ref = get_data_by_ref &&
_rs_reader->support_return_data_by_ref() &&
+ config::enable_storage_vectorization;
+ if (!_get_data_by_ref) {
+ _block = std::make_shared<Block>(_schema.create_block(
+ _reader->_return_columns,
_reader->_tablet_columns_convert_to_null_set));
+ }
+ auto st = _refresh_current_row();
+ if (_get_data_by_ref && _block_view.size()) {
+ _ref = _block_view[0];
+ } else {
+ _ref = {_block, 0, false};
+ }
+ return st;
}
int64_t VCollectIterator::Level0Iterator::version() const {
@@ -209,42 +212,50 @@ int64_t VCollectIterator::Level0Iterator::version() const
{
Status VCollectIterator::Level0Iterator::_refresh_current_row() {
do {
- if (_block->rows() != 0 && _ref.row_pos < _block->rows()) {
+ if (!_is_empty() && _current_valid()) {
return Status::OK();
} else {
- _ref.is_same = false;
- _ref.row_pos = 0;
- _block->clear_column_data();
- auto res = _rs_reader->next_block(_block.get());
+ _reset();
+ auto res = _refresh();
if (!res.ok() && res.precise_code() != OLAP_ERR_DATA_EOF) {
return res;
}
- if (res.precise_code() == OLAP_ERR_DATA_EOF && _block->rows() ==
0) {
- _ref.row_pos = -1;
- return Status::OLAPInternalError(OLAP_ERR_DATA_EOF);
+ if (res.precise_code() == OLAP_ERR_DATA_EOF && _is_empty()) {
+ break;
}
if (UNLIKELY(_reader->_reader_context.record_rowids)) {
RETURN_NOT_OK(_rs_reader->current_block_row_locations(&_block_row_locations));
- DCHECK_EQ(_block_row_locations.size(), _block->rows());
}
}
- } while (_block->rows() != 0);
+ } while (!_is_empty());
_ref.row_pos = -1;
+ _current = -1;
return Status::OLAPInternalError(OLAP_ERR_DATA_EOF);
}
Status VCollectIterator::Level0Iterator::next(IteratorRowRef* ref) {
- _ref.row_pos++;
+ if (_get_data_by_ref) {
+ _current++;
+ } else {
+ _ref.row_pos++;
+ }
+
RETURN_NOT_OK(_refresh_current_row());
+
+ if (_get_data_by_ref) {
+ _ref = _block_view[_current];
+ }
+
*ref = _ref;
return Status::OK();
}
Status VCollectIterator::Level0Iterator::next(Block* block) {
- if (UNLIKELY(_ref.block->rows() > 0 && _ref.row_pos == 0)) {
+ CHECK(!_get_data_by_ref);
+ if (_ref.row_pos == 0 && _ref.block != nullptr &&
UNLIKELY(_ref.block->rows() > 0)) {
block->swap(*_ref.block);
- _ref.row_pos = -1;
+ _ref.reset();
return Status::OK();
} else {
auto res = _rs_reader->next_block(block);
@@ -262,7 +273,7 @@ Status VCollectIterator::Level0Iterator::next(Block* block)
{
}
RowLocation VCollectIterator::Level0Iterator::current_row_location() {
- RowLocation& segment_row_id = _block_row_locations[_ref.row_pos];
+ RowLocation& segment_row_id = _block_row_locations[_get_data_by_ref ?
_current : _ref.row_pos];
return RowLocation(_rs_reader->rowset()->rowset_id(),
segment_row_id.segment_id,
segment_row_id.row_id);
}
@@ -287,7 +298,7 @@ VCollectIterator::Level1Iterator::Level1Iterator(
_merge(merge),
_is_reverse(is_reverse),
_skip_same(skip_same) {
- _ref.row_pos = -1; // represent eof
+ _ref.reset();
_batch_size = reader->_batch_size;
}
@@ -303,7 +314,9 @@ VCollectIterator::Level1Iterator::~Level1Iterator() {
while (!_heap->empty()) {
auto child = _heap->top();
_heap->pop();
- if (child) delete child;
+ if (child) {
+ delete child;
+ }
}
}
}
@@ -315,7 +328,7 @@ VCollectIterator::Level1Iterator::~Level1Iterator() {
// Others when error happens
Status VCollectIterator::Level1Iterator::next(IteratorRowRef* ref) {
if (UNLIKELY(_cur_child == nullptr)) {
- _ref.row_pos = -1;
+ _ref.reset();
return Status::OLAPInternalError(OLAP_ERR_DATA_EOF);
}
if (_merge) {
@@ -348,7 +361,7 @@ int64_t VCollectIterator::Level1Iterator::version() const {
return -1;
}
-Status VCollectIterator::Level1Iterator::init() {
+Status VCollectIterator::Level1Iterator::init(bool get_data_by_ref) {
if (_children.empty()) {
return Status::OK();
}
@@ -392,11 +405,12 @@ Status
VCollectIterator::Level1Iterator::_merge_next(IteratorRowRef* ref) {
if (!_heap->empty()) {
_cur_child = _heap->top();
} else {
+ _ref.reset();
_cur_child = nullptr;
- _ref.row_pos = -1;
return Status::OLAPInternalError(OLAP_ERR_DATA_EOF);
}
} else {
+ _ref.reset();
_cur_child = nullptr;
LOG(WARNING) << "failed to get next from child, res=" << res;
return res;
@@ -465,7 +479,7 @@ Status VCollectIterator::Level1Iterator::_merge_next(Block*
block) {
pre_row_ref.row_pos,
continuous_row_in_block);
}
continuous_row_in_block = 0;
- pre_row_ref.block = nullptr;
+ pre_row_ref.reset();
}
auto res = _merge_next(&cur_row);
if (UNLIKELY(res.precise_code() == OLAP_ERR_DATA_EOF)) {
diff --git a/be/src/vec/olap/vcollect_iterator.h
b/be/src/vec/olap/vcollect_iterator.h
index eabb0ad2e5..69ef3da70a 100644
--- a/be/src/vec/olap/vcollect_iterator.h
+++ b/be/src/vec/olap/vcollect_iterator.h
@@ -17,13 +17,13 @@
#pragma once
+#include "common/status.h"
#ifdef USE_LIBCPP
#include <queue>
#else
#include <ext/pb_ds/priority_queue.hpp>
#endif
-#include "olap/olap_define.h"
#include "olap/reader.h"
#include "olap/rowset/rowset_reader.h"
#include "vec/core/block.h"
@@ -34,16 +34,10 @@ class TabletSchema;
namespace vectorized {
-struct IteratorRowRef {
- std::shared_ptr<Block> block;
- int16_t row_pos;
- bool is_same;
-};
-
class VCollectIterator {
public:
// Hold reader point to get reader params
- ~VCollectIterator();
+ ~VCollectIterator() = default;
void init(TabletReader* reader, bool force_merge, bool is_reverse);
@@ -83,7 +77,7 @@ private:
: _schema(reader->tablet_schema()),
_compare_columns(reader->_reader_context.read_orderby_key_columns) {};
- virtual Status init() = 0;
+ virtual Status init(bool get_data_by_ref = false) = 0;
virtual int64_t version() const = 0;
@@ -95,7 +89,7 @@ private:
void set_same(bool same) { _ref.is_same = same; }
- bool is_same() { return _ref.is_same; }
+ bool is_same() const { return _ref.is_same; }
virtual ~LevelIterator() = default;
@@ -140,9 +134,9 @@ private:
class Level0Iterator : public LevelIterator {
public:
Level0Iterator(RowsetReaderSharedPtr rs_reader, TabletReader* reader);
- ~Level0Iterator() {}
+ ~Level0Iterator() override = default;
- Status init() override;
+ Status init(bool get_data_by_ref = false) override;
int64_t version() const override;
@@ -156,11 +150,53 @@ private:
private:
Status _refresh_current_row();
+ Status _next_by_ref(IteratorRowRef* ref);
+ Status _refresh_current_row_by_ref();
+
+ bool _is_empty() {
+ if (_get_data_by_ref) {
+ return _block_view.empty();
+ } else {
+ return _block->rows() == 0;
+ }
+ }
+
+ bool _current_valid() {
+ if (_get_data_by_ref) {
+ return _current < _block_view.size();
+ } else {
+ return _ref.row_pos < _block->rows();
+ }
+ }
+
+ void _reset() {
+ if (_get_data_by_ref) {
+ _block_view.clear();
+ _ref.reset();
+ _current = 0;
+ } else {
+ _ref.is_same = false;
+ _ref.row_pos = 0;
+ _block->clear_column_data();
+ }
+ }
+
+ Status _refresh() {
+ if (_get_data_by_ref) {
+ return _rs_reader->next_block_view(&_block_view);
+ } else {
+ return _rs_reader->next_block(_block.get());
+ }
+ }
RowsetReaderSharedPtr _rs_reader;
TabletReader* _reader = nullptr;
std::shared_ptr<Block> _block;
+
+ int _current;
+ BlockView _block_view;
std::vector<RowLocation> _block_row_locations;
+ bool _get_data_by_ref = false;
};
// Iterate from LevelIterators (maybe Level0Iterators or Level1Iterator or
mixed)
@@ -169,7 +205,7 @@ private:
Level1Iterator(const std::list<LevelIterator*>& children,
TabletReader* reader, bool merge,
bool is_reverse, bool skip_same);
- Status init() override;
+ Status init(bool get_data_by_ref = false) override;
int64_t version() const override;
@@ -181,7 +217,7 @@ private:
Status current_block_row_locations(std::vector<RowLocation>*
block_row_locations) override;
- ~Level1Iterator();
+ ~Level1Iterator() override;
private:
Status _merge_next(IteratorRowRef* ref);
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp
b/be/src/vec/olap/vgeneric_iterators.cpp
index 9f50040b2e..280d7b054a 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -15,12 +15,14 @@
// specific language governing permissions and limitations
// under the License.
+#include <memory>
#include <queue>
#include <utility>
+#include "common/status.h"
#include "olap/iterators.h"
-#include "olap/row.h"
-#include "olap/row_block2.h"
+#include "olap/schema.h"
+#include "vec/core/block.h"
namespace doris {
@@ -44,17 +46,17 @@ public:
// Will generate num_rows rows in total
VAutoIncrementIterator(const Schema& schema, size_t num_rows)
: _schema(schema), _num_rows(num_rows), _rows_returned() {}
- ~VAutoIncrementIterator() override {}
+ ~VAutoIncrementIterator() override = default;
// NOTE: Currently, this function will ignore StorageReadOptions
Status init(const StorageReadOptions& opts) override;
- Status next_batch(vectorized::Block* block) override {
+ Status next_batch(Block* block) override {
int row_idx = 0;
while (_rows_returned < _num_rows) {
for (int j = 0; j < _schema.num_columns(); ++j) {
- vectorized::ColumnWithTypeAndName& vc =
block->get_by_position(j);
- vectorized::IColumn& vi = (vectorized::IColumn&)(*vc.column);
+ ColumnWithTypeAndName& vc = block->get_by_position(j);
+ IColumn& vi = (IColumn&)(*vc.column);
char data[16] = {};
size_t data_len = 0;
@@ -91,7 +93,9 @@ public:
++_rows_returned;
}
- if (row_idx > 0) return Status::OK();
+ if (row_idx > 0) {
+ return Status::OK();
+ }
return Status::EndOfFile("End of VAutoIncrementIterator");
}
@@ -139,8 +143,8 @@ public:
_iter = nullptr;
}
- Status block_reset() {
- if (!_block) {
+ Status block_reset(const std::shared_ptr<Block>& block) {
+ if (!*block) {
const Schema& schema = _iter->schema();
const auto& column_ids = schema.column_ids();
for (size_t i = 0; i < schema.num_column_ids(); ++i) {
@@ -151,11 +155,11 @@ public:
}
auto column = data_type->create_column();
column->reserve(_block_row_max);
- _block.insert(
+ block->insert(
ColumnWithTypeAndName(std::move(column), data_type,
column_desc->name()));
}
} else {
- _block.clear_column_data();
+ block->clear_column_data();
}
return Status::OK();
}
@@ -165,10 +169,10 @@ public:
bool compare(const VMergeIteratorContext& rhs) const {
int cmp_res = UNLIKELY(_compare_columns)
- ? this->_block.compare_at(_index_in_block,
rhs._index_in_block,
- _compare_columns,
rhs._block, -1)
- : this->_block.compare_at(_index_in_block,
rhs._index_in_block,
- _num_key_columns,
rhs._block, -1);
+ ? _block->compare_at(_index_in_block,
rhs._index_in_block,
+ _compare_columns,
*rhs._block, -1)
+ : _block->compare_at(_index_in_block,
rhs._index_in_block,
+ _num_key_columns,
*rhs._block, -1);
if (cmp_res != 0) {
return UNLIKELY(_is_reverse) ? cmp_res < 0 : cmp_res > 0;
@@ -176,47 +180,52 @@ public:
auto col_cmp_res = 0;
if (_sequence_id_idx != -1) {
- col_cmp_res = this->_block.compare_column_at(_index_in_block,
rhs._index_in_block,
- _sequence_id_idx,
rhs._block, -1);
+ col_cmp_res = _block->compare_column_at(_index_in_block,
rhs._index_in_block,
+ _sequence_id_idx,
*rhs._block, -1);
}
- auto result = col_cmp_res == 0 ? this->data_id() < rhs.data_id() :
col_cmp_res < 0;
+ auto result = col_cmp_res == 0 ? data_id() < rhs.data_id() :
col_cmp_res < 0;
if (_is_unique) {
- result ? this->set_skip(true) : rhs.set_skip(true);
+ result ? set_skip(true) : rhs.set_skip(true);
}
return result;
}
- // there is two situation in copy_rows:
- // 1... `advanced = false` when current block finished, we should copy
block before advance(iterator)
- // If we iterator a block from start to end, _index_in_block=rows()-1, and
_cur_batch_num=rows,
- // so we should copy from (_index_in_block - _cur_batch_num + 1)
-
- // 2... `advanced = true` when current block not finished and we
advanced to next block, now
- // cur_batch_num = (pre_block iteraotr num) + 1, but actually pre_block
iterator num is cur_batch_num -1
- // so we have a ` if (advanced) start -- `
- void copy_rows(vectorized::Block* block, bool advanced = true) {
- vectorized::Block& src = _block;
- vectorized::Block& dst = *block;
+ // `advanced = false` when current block finished
+ void copy_rows(Block* block, bool advanced = true) {
+ Block& src = *_block;
+ Block& dst = *block;
if (_cur_batch_num == 0) {
return;
}
+ // copy a row to dst block column by column
+ size_t start = _index_in_block - _cur_batch_num + 1 - advanced;
+ DCHECK(start >= 0);
+
for (size_t i = 0; i < _num_columns; ++i) {
auto& s_col = src.get_by_position(i);
auto& d_col = dst.get_by_position(i);
- vectorized::ColumnPtr& s_cp = s_col.column;
- vectorized::ColumnPtr& d_cp = d_col.column;
+ ColumnPtr& s_cp = s_col.column;
+ ColumnPtr& d_cp = d_col.column;
- //copy a row to dst block column by column
- size_t start = _index_in_block - _cur_batch_num + 1;
- if (advanced) {
- start--;
- }
- DCHECK(start >= 0);
- ((vectorized::IColumn&)(*d_cp)).insert_range_from(*s_cp, start,
_cur_batch_num);
+ d_cp->assume_mutable()->insert_range_from(*s_cp, start,
_cur_batch_num);
+ }
+ _cur_batch_num = 0;
+ }
+
+ void copy_rows(BlockView* view, bool advanced = true) {
+ if (_cur_batch_num == 0) {
+ return;
+ }
+ size_t start = _index_in_block - _cur_batch_num + 1 - advanced;
+ DCHECK(start >= 0);
+
+ for (size_t i = 0; i < _cur_batch_num; ++i) {
+ view->push_back({_block, static_cast<int>(start + i), false});
}
+
_cur_batch_num = 0;
}
@@ -245,12 +254,7 @@ public:
void reset_cur_batch() { _cur_batch_num = 0; }
- bool is_cur_block_finished() {
- if (_index_in_block == _block.rows() - 1) {
- return true;
- }
- return false;
- }
+ bool is_cur_block_finished() { return _index_in_block == _block->rows() -
1; }
private:
// Load next block into _block
@@ -258,9 +262,6 @@ private:
RowwiseIterator* _iter;
- // used to store data load from iterator->next_batch(Vectorized::Block*)
- vectorized::Block _block;
-
int _sequence_id_idx = -1;
bool _is_unique = false;
bool _is_reverse = false;
@@ -275,13 +276,17 @@ private:
std::vector<RowLocation> _block_row_locations;
bool _record_rowids = false;
size_t _cur_batch_num = 0;
+
+ // used to store data load from iterator->next_batch(Block*)
+ std::shared_ptr<Block> _block;
+ // used to store data still on block view
+ std::list<std::shared_ptr<Block>> _block_list;
};
Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
_block_row_max = opts.block_row_max;
_record_rowids = opts.record_rowids;
RETURN_IF_ERROR(_iter->init(opts));
- RETURN_IF_ERROR(block_reset());
RETURN_IF_ERROR(_load_next_block());
if (valid()) {
RETURN_IF_ERROR(advance());
@@ -294,7 +299,7 @@ Status VMergeIteratorContext::advance() {
// NOTE: we increase _index_in_block directly to valid one check
do {
_index_in_block++;
- if (LIKELY(_index_in_block < _block.rows())) {
+ if (LIKELY(_index_in_block < _block->rows())) {
return Status::OK();
}
// current batch has no data, load next batch
@@ -305,8 +310,23 @@ Status VMergeIteratorContext::advance() {
Status VMergeIteratorContext::_load_next_block() {
do {
- block_reset();
- Status st = _iter->next_batch(&_block);
+ if (_block != nullptr) {
+ _block_list.push_back(_block);
+ _block = nullptr;
+ }
+ for (auto it = _block_list.begin(); it != _block_list.end(); it++) {
+ if (it->use_count() == 1) {
+ block_reset(*it);
+ _block = *it;
+ _block_list.erase(it);
+ break;
+ }
+ }
+ if (_block == nullptr) {
+ _block = std::make_shared<Block>();
+ block_reset(_block);
+ }
+ Status st = _iter->next_batch(_block.get());
if (!st.ok()) {
_valid = false;
if (st.is_end_of_file()) {
@@ -318,7 +338,7 @@ Status VMergeIteratorContext::_load_next_block() {
if (UNLIKELY(_record_rowids)) {
RETURN_IF_ERROR(_iter->current_block_row_locations(&_block_row_locations));
}
- } while (_block.rows() == 0);
+ } while (_block->rows() == 0);
_index_in_block = -1;
_valid = true;
return Status::OK();
@@ -345,7 +365,10 @@ public:
Status init(const StorageReadOptions& opts) override;
- Status next_batch(vectorized::Block* block) override;
+ Status next_batch(Block* block) override { return _next_batch(block); }
+ Status next_block_view(BlockView* block_view) override { return
_next_batch(block_view); }
+
+ bool support_return_data_by_ref() override { return true; }
const Schema& schema() const override { return *_schema; }
@@ -356,6 +379,71 @@ public:
}
private:
+ int _get_size(Block* block) { return block->rows(); }
+ int _get_size(BlockView* block_view) { return block_view->size(); }
+
+ template <typename T>
+ Status _next_batch(T* block) {
+ if (UNLIKELY(_record_rowids)) {
+ _block_row_locations.resize(_block_row_max);
+ }
+ size_t row_idx = 0;
+ VMergeIteratorContext* pre_ctx = nullptr;
+ while (_get_size(block) < _block_row_max) {
+ if (_merge_heap.empty()) {
+ break;
+ }
+
+ auto ctx = _merge_heap.top();
+ _merge_heap.pop();
+
+ if (!ctx->need_skip()) {
+ ctx->add_cur_batch();
+ if (pre_ctx != ctx) {
+ if (pre_ctx) {
+ pre_ctx->copy_rows(block);
+ }
+ pre_ctx = ctx;
+ }
+ if (UNLIKELY(_record_rowids)) {
+ _block_row_locations[row_idx] =
ctx->current_row_location();
+ }
+ row_idx++;
+ if (ctx->is_cur_block_finished() || row_idx >= _block_row_max)
{
+ // current block finished, ctx not advance
+ // so copy start_idx = (_index_in_block - _cur_batch_num +
1)
+ ctx->copy_rows(block, false);
+ pre_ctx = nullptr;
+ }
+ } else if (_merged_rows != nullptr) {
+ (*_merged_rows)++;
+ // need skip cur row, so flush rows in pre_ctx
+ if (pre_ctx) {
+ pre_ctx->copy_rows(block);
+ pre_ctx = nullptr;
+ }
+ }
+
+ RETURN_IF_ERROR(ctx->advance());
+ if (ctx->valid()) {
+ _merge_heap.push(ctx);
+ } else {
+ // Release ctx earlier to reduce resource consumed
+ delete ctx;
+ }
+ }
+ if (!_merge_heap.empty()) {
+ return Status::OK();
+ }
+ // Still last batch needs to be processed
+
+ if (UNLIKELY(_record_rowids)) {
+ _block_row_locations.resize(row_idx);
+ }
+
+ return Status::EndOfFile("no more data in segment");
+ }
+
// It will be released after '_merge_heap' has been built.
std::vector<RowwiseIterator*> _origin_iters;
@@ -406,65 +494,6 @@ Status VMergeIterator::init(const StorageReadOptions&
opts) {
return Status::OK();
}
-Status VMergeIterator::next_batch(vectorized::Block* block) {
- if (UNLIKELY(_record_rowids)) {
- _block_row_locations.resize(_block_row_max);
- }
- size_t row_idx = 0;
- VMergeIteratorContext* pre_ctx = nullptr;
- while (block->rows() < _block_row_max) {
- if (_merge_heap.empty()) break;
-
- auto ctx = _merge_heap.top();
- _merge_heap.pop();
-
- if (!ctx->need_skip()) {
- ctx->add_cur_batch();
- if (pre_ctx != ctx) {
- if (pre_ctx) {
- pre_ctx->copy_rows(block);
- }
- pre_ctx = ctx;
- }
- if (UNLIKELY(_record_rowids)) {
- _block_row_locations[row_idx] = ctx->current_row_location();
- }
- row_idx++;
- if (ctx->is_cur_block_finished() || row_idx >= _block_row_max) {
- // current block finished, ctx not advance
- // so copy start_idx = (_index_in_block - _cur_batch_num + 1)
- ctx->copy_rows(block, false);
- pre_ctx = nullptr;
- }
- } else if (_merged_rows != nullptr) {
- (*_merged_rows)++;
- // need skip cur row, so flush rows in pre_ctx
- if (pre_ctx) {
- pre_ctx->copy_rows(block);
- pre_ctx = nullptr;
- }
- }
-
- RETURN_IF_ERROR(ctx->advance());
- if (ctx->valid()) {
- _merge_heap.push(ctx);
- } else {
- // Release ctx earlier to reduce resource consumed
- delete ctx;
- }
- }
- if (!_merge_heap.empty()) {
- return Status::OK();
- }
- // Still last batch needs to be processed
-
- if (UNLIKELY(_record_rowids)) {
- _block_row_locations.resize(row_idx);
- }
-
- return Status::EndOfFile("no more data in segment");
-}
-
// VUnionIterator will read data from input iterator one by one.
class VUnionIterator : public RowwiseIterator {
public:
@@ -480,7 +509,7 @@ public:
Status init(const StorageReadOptions& opts) override;
- Status next_batch(vectorized::Block* block) override;
+ Status next_batch(Block* block) override;
const Schema& schema() const override { return *_schema; }
@@ -505,7 +534,7 @@ Status VUnionIterator::init(const StorageReadOptions& opts)
{
return Status::OK();
}
-Status VUnionIterator::next_batch(vectorized::Block* block) {
+Status VUnionIterator::next_batch(Block* block) {
while (_cur_iter != nullptr) {
auto st = _cur_iter->next_batch(block);
if (st.is_end_of_file()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]