This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new d25f0ba Make ColumnReader load lazily (#2026)
d25f0ba is described below
commit d25f0ba69a167efe95612bb9af403446ddb89a78
Author: Dayue Gao <[email protected]>
AuthorDate: Wed Oct 23 10:25:28 2019 +0800
Make ColumnReader load lazily (#2026)
[Storage][SegmentV2]
Currently `segment_v2::Segment::open` will eagerly initialize all column
readers, regardless of whether the column is queried or not. Initializing
`segment_v2::ColumnReader` incurs additional I/O cost to read ordinal index and
zonemap index and should be delayed to the time it's needed.
---
be/src/olap/rowset/segment_v2/column_reader.cpp | 73 +++++++++++-----------
be/src/olap/rowset/segment_v2/column_reader.h | 67 +++++++++++++-------
be/src/olap/rowset/segment_v2/segment.cpp | 35 ++++-------
be/src/olap/rowset/segment_v2/segment.h | 14 ++---
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 8 ++-
.../segment_v2/column_reader_writer_test.cpp | 11 ++--
.../rowset/segment_v2/column_zone_map_test.cpp | 10 +--
be/test/olap/tablet_schema_helper.h | 12 ++--
8 files changed, 122 insertions(+), 108 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index 0752ca9..844feac 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -17,6 +17,7 @@
#include "olap/rowset/segment_v2/column_reader.h"
+#include "common/logging.h"
#include "env/env.h" // for RandomAccessFile
#include "gutil/strings/substitute.h" // for Substitute
#include "olap/rowset/segment_v2/encoding_info.h" // for EncodingInfo
@@ -71,37 +72,34 @@ struct ParsedPage {
size_t remaining() const { return num_rows - offset_in_page; }
};
+Status ColumnReader::create(const ColumnReaderOptions& opts,
+ const ColumnMetaPB& meta,
+ uint64_t num_rows,
+ RandomAccessFile* file,
+ std::unique_ptr<ColumnReader>* reader) {
+ std::unique_ptr<ColumnReader> reader_local(
+ new ColumnReader(opts, meta, num_rows, file));
+ RETURN_IF_ERROR(reader_local->init());
+ *reader = std::move(reader_local);
+ return Status::OK();
+}
+
ColumnReader::ColumnReader(const ColumnReaderOptions& opts,
const ColumnMetaPB& meta,
uint64_t num_rows,
RandomAccessFile* file)
- : _opts(opts),
- _meta(meta),
- _num_rows(num_rows),
- _file(file) {
+ : _opts(opts), _meta(meta), _num_rows(num_rows), _file(file) {
}
-ColumnReader::~ColumnReader() {
-}
+ColumnReader::~ColumnReader() = default;
Status ColumnReader::init() {
- return _init_once.call([this] { return _do_init_once(); });
-}
-
-Status ColumnReader::_do_init_once() {
_type_info = get_type_info((FieldType)_meta.type());
if (_type_info == nullptr) {
return Status::NotSupported(Substitute("unsupported typeinfo,
type=$0", _meta.type()));
}
RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(),
&_encoding_info));
-
- // Get compress codec
RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(),
&_compress_codec));
-
- RETURN_IF_ERROR(_init_ordinal_index());
-
- RETURN_IF_ERROR(_init_column_zone_map());
-
return Status::OK();
}
@@ -175,20 +173,23 @@ Status ColumnReader::read_page(const PagePointer& pp,
OlapReaderStatistics* stat
return Status::OK();
}
-void ColumnReader::get_row_ranges_by_zone_map(CondColumn* cond_column,
- const std::vector<CondColumn*>& delete_conditions,
OlapReaderStatistics* stats,
- RowRanges* row_ranges) {
- std::vector<uint32_t> page_indexes;
- _get_filtered_pages(cond_column, stats, delete_conditions, &page_indexes);
- _calculate_row_ranges(page_indexes, row_ranges);
-}
+Status ColumnReader::get_row_ranges_by_zone_map(CondColumn* cond_column,
+ const
std::vector<CondColumn*>& delete_conditions,
+ OlapReaderStatistics* stats,
+ RowRanges* row_ranges) {
+ DCHECK(has_zone_map());
+ RETURN_IF_ERROR(_ensure_index_loaded());
-PagePointer ColumnReader::get_dict_page_pointer() const {
- return _meta.dict_page();
+ std::vector<uint32_t> page_indexes;
+ RETURN_IF_ERROR(_get_filtered_pages(cond_column, delete_conditions, stats,
&page_indexes));
+ RETURN_IF_ERROR(_calculate_row_ranges(page_indexes, row_ranges));
+ return Status::OK();
}
-void ColumnReader::_get_filtered_pages(CondColumn* cond_column,
OlapReaderStatistics* stats,
- const std::vector<CondColumn*>& delete_conditions,
std::vector<uint32_t>* page_indexes) {
+Status ColumnReader::_get_filtered_pages(CondColumn* cond_column,
+ const std::vector<CondColumn*>&
delete_conditions,
+ OlapReaderStatistics* stats,
+ std::vector<uint32_t>* page_indexes) {
FieldType type = _type_info->type();
const std::vector<ZoneMapPB>& zone_maps =
_column_zone_map->get_column_zone_map();
int32_t page_size = _column_zone_map->num_pages();
@@ -231,19 +232,20 @@ void ColumnReader::_get_filtered_pages(CondColumn*
cond_column, OlapReaderStatis
stats->rows_stats_filtered += page_last_id - page_first_id + 1;
}
}
+ return Status::OK();
}
-void ColumnReader::_calculate_row_ranges(const std::vector<uint32_t>&
page_indexes, RowRanges* row_ranges) {
+Status ColumnReader::_calculate_row_ranges(const std::vector<uint32_t>&
page_indexes, RowRanges* row_ranges) {
for (auto i : page_indexes) {
rowid_t page_first_id = _ordinal_index->get_first_row_id(i);
rowid_t page_last_id = _ordinal_index->get_last_row_id(i);
RowRanges page_row_ranges(RowRanges::create_single(page_first_id,
page_last_id + 1));
RowRanges::ranges_union(*row_ranges, page_row_ranges, row_ranges);
}
+ return Status::OK();
}
-// initial ordinal index
-Status ColumnReader::_init_ordinal_index() {
+Status ColumnReader::_load_ordinal_index() {
PagePointer pp = _meta.ordinal_index_page();
PageHandle ph;
OlapReaderStatistics stats;
@@ -251,12 +253,10 @@ Status ColumnReader::_init_ordinal_index() {
_ordinal_index.reset(new OrdinalPageIndex(ph.data(), _num_rows));
RETURN_IF_ERROR(_ordinal_index->load());
-
return Status::OK();
}
-// initialize column zone map
-Status ColumnReader::_init_column_zone_map() {
+Status ColumnReader::_load_zone_map_index() {
if (_meta.has_zone_map_page()) {
PagePointer pp = _meta.zone_map_page();
PageHandle ph;
@@ -272,6 +272,7 @@ Status ColumnReader::_init_column_zone_map() {
}
Status ColumnReader::seek_to_first(OrdinalPageIndexIterator* iter) {
+ RETURN_IF_ERROR(_ensure_index_loaded());
*iter = _ordinal_index->begin();
if (!iter->valid()) {
return Status::NotFound("Failed to seek to first rowid");
@@ -280,6 +281,7 @@ Status
ColumnReader::seek_to_first(OrdinalPageIndexIterator* iter) {
}
Status ColumnReader::seek_at_or_before(rowid_t rowid,
OrdinalPageIndexIterator* iter) {
+ RETURN_IF_ERROR(_ensure_index_loaded());
*iter = _ordinal_index->seek_at_or_before(rowid);
if (!iter->valid()) {
return Status::NotFound(Substitute("Failed to seek to rowid $0, ",
rowid));
@@ -290,8 +292,7 @@ Status ColumnReader::seek_at_or_before(rowid_t rowid,
OrdinalPageIndexIterator*
FileColumnIterator::FileColumnIterator(ColumnReader* reader) : _reader(reader)
{
}
-FileColumnIterator::~FileColumnIterator() {
-}
+FileColumnIterator::~FileColumnIterator() = default;
Status FileColumnIterator::seek_to_first() {
RETURN_IF_ERROR(_reader->seek_to_first(&_page_iter));
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h
b/be/src/olap/rowset/segment_v2/column_reader.h
index f4d9c77..860f7b1 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -63,12 +63,15 @@ struct ColumnIteratorOptions {
// This will cache data shared by all reader
class ColumnReader {
public:
- ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
- uint64_t num_rows, RandomAccessFile* file);
- ~ColumnReader();
+ // Create an initialized ColumnReader in *reader.
+ // This should be a lightweight operation without I/O.
+ static Status create(const ColumnReaderOptions& opts,
+ const ColumnMetaPB& meta,
+ uint64_t num_rows,
+ RandomAccessFile* file,
+ std::unique_ptr<ColumnReader>* reader);
- // May be called multiple times, subsequent calls will no op.
- Status init();
+ ~ColumnReader();
// create a new column iterator. Client should delete returned iterator
Status new_iterator(ColumnIterator** iterator);
@@ -81,47 +84,63 @@ public:
Status read_page(const PagePointer& pp, OlapReaderStatistics* stats,
PageHandle* handle);
bool is_nullable() const { return _meta.is_nullable(); }
+
const EncodingInfo* encoding_info() const { return _encoding_info; }
+
const TypeInfo* type_info() const { return _type_info; }
- bool has_zone_map() { return _meta.has_zone_map_page(); }
+ bool has_zone_map() const { return _meta.has_zone_map_page(); }
// get row ranges with zone map
- // cond_column is user's query predicate
- // delete_conditions is a vector of delete predicate of different version
- void get_row_ranges_by_zone_map(CondColumn* cond_column, const
std::vector<CondColumn*>& delete_conditions,
- OlapReaderStatistics* stats, RowRanges* row_ranges);
+ // - cond_column is user's query predicate
+ // - delete_conditions is a vector of delete predicate of different version
+ Status get_row_ranges_by_zone_map(CondColumn* cond_column,
+ const std::vector<CondColumn*>&
delete_conditions,
+ OlapReaderStatistics* stats,
+ RowRanges* row_ranges);
- PagePointer get_dict_page_pointer() const;
+ PagePointer get_dict_page_pointer() const { return _meta.dict_page(); }
private:
- Status _do_init_once();
-
- Status _init_ordinal_index();
+ ColumnReader(const ColumnReaderOptions& opts,
+ const ColumnMetaPB& meta,
+ uint64_t num_rows,
+ RandomAccessFile* file);
+ Status init();
- Status _init_column_zone_map();
+ // Read and load necessary column indexes into memory if it hasn't been
loaded.
+ // May be called multiple times, subsequent calls will no op.
+ Status _ensure_index_loaded() {
+ return _load_index_once.call([this] {
+ RETURN_IF_ERROR(_load_zone_map_index());
+ RETURN_IF_ERROR(_load_ordinal_index());
+ return Status::OK();
+ });
+ }
+ Status _load_zone_map_index();
+ Status _load_ordinal_index();
- void _get_filtered_pages(CondColumn* cond_column, OlapReaderStatistics*
stats,
- const std::vector<CondColumn*>& delete_conditions,
std::vector<uint32_t>* page_indexes);
+ Status _get_filtered_pages(CondColumn* cond_column,
+ const std::vector<CondColumn*>&
delete_conditions,
+ OlapReaderStatistics* stats,
+ std::vector<uint32_t>* page_indexes);
- void _calculate_row_ranges(const std::vector<uint32_t>& page_indexes,
RowRanges* row_ranges);
+ Status _calculate_row_ranges(const std::vector<uint32_t>& page_indexes,
RowRanges* row_ranges);
private:
ColumnReaderOptions _opts;
ColumnMetaPB _meta;
uint64_t _num_rows;
- RandomAccessFile* _file = nullptr;
+ RandomAccessFile* _file;
- DorisCallOnce<Status> _init_once;
+ // initialized in init()
const TypeInfo* _type_info = nullptr;
const EncodingInfo* _encoding_info = nullptr;
const BlockCompressionCodec* _compress_codec = nullptr;
- // get page pointer from index
- std::unique_ptr<OrdinalPageIndex> _ordinal_index;
-
- // column zone map info
+ DorisCallOnce<Status> _load_index_once;
std::unique_ptr<ColumnZoneMap> _column_zone_map;
+ std::unique_ptr<OrdinalPageIndex> _ordinal_index;
};
// Base iterator to read one column data
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index 39028d4..fca3ddf 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -51,18 +51,12 @@ Segment::Segment(
_tablet_schema(tablet_schema) {
}
-Segment::~Segment() {
- for (auto reader : _column_readers) {
- delete reader;
- }
-}
+Segment::~Segment() = default;
Status Segment::_open() {
RETURN_IF_ERROR(Env::Default()->new_random_access_file(_fname,
&_input_file));
- // parse footer to get meta
RETURN_IF_ERROR(_parse_footer());
- // initial all column reader
- RETURN_IF_ERROR(_initial_column_readers());
+ RETURN_IF_ERROR(_create_column_readers());
return Status::OK();
}
@@ -161,11 +155,6 @@ Status Segment::_parse_footer() {
if (!_footer.ParseFromString(footer_buf)) {
return Status::Corruption(Substitute("Bad segment file $0: failed to
parse SegmentFooterPB", _fname));
}
-
- for (uint32_t ordinal = 0; ordinal < _footer.columns().size(); ++ordinal) {
- auto& column_pb = _footer.columns(ordinal);
- _column_id_to_footer_ordinal.emplace(column_pb.unique_id(), ordinal);
- }
return Status::OK();
}
@@ -183,12 +172,13 @@ Status Segment::_load_index() {
});
}
-Status Segment::_initial_column_readers() {
- // TODO(zc): Lazy init()?
- // There may be too many columns, majority of them would not be used
- // in query, so we should not init them here.
- _column_readers.resize(_tablet_schema->columns().size(), nullptr);
+Status Segment::_create_column_readers() {
+ for (uint32_t ordinal = 0; ordinal < _footer.columns().size(); ++ordinal) {
+ auto& column_pb = _footer.columns(ordinal);
+ _column_id_to_footer_ordinal.emplace(column_pb.unique_id(), ordinal);
+ }
+ _column_readers.resize(_tablet_schema->columns().size());
for (uint32_t ordinal = 0; ordinal < _tablet_schema->num_columns();
++ordinal) {
auto& column = _tablet_schema->columns()[ordinal];
auto iter = _column_id_to_footer_ordinal.find(column.unique_id());
@@ -197,11 +187,10 @@ Status Segment::_initial_column_readers() {
}
ColumnReaderOptions opts;
- std::unique_ptr<ColumnReader> reader(
- new ColumnReader(opts, _footer.columns(iter->second),
_footer.num_rows(), _input_file.get()));
- RETURN_IF_ERROR(reader->init());
-
- _column_readers[ordinal] = reader.release();
+ std::unique_ptr<ColumnReader> reader;
+ RETURN_IF_ERROR(ColumnReader::create(
+ opts, _footer.columns(iter->second), _footer.num_rows(),
_input_file.get(), &reader));
+ _column_readers[ordinal] = std::move(reader);
}
return Status::OK();
}
diff --git a/be/src/olap/rowset/segment_v2/segment.h
b/be/src/olap/rowset/segment_v2/segment.h
index 2f2a549..bc81e9d 100644
--- a/be/src/olap/rowset/segment_v2/segment.h
+++ b/be/src/olap/rowset/segment_v2/segment.h
@@ -83,7 +83,7 @@ private:
// open segment file and read the minimum amount of necessary information
(footer)
Status _open();
Status _parse_footer();
- Status _initial_column_readers();
+ Status _create_column_readers();
Status new_column_iterator(uint32_t cid, ColumnIterator** iter);
size_t num_short_keys() const { return
_tablet_schema->num_short_key_columns(); }
@@ -122,10 +122,15 @@ private:
SegmentFooterPB _footer;
std::unique_ptr<RandomAccessFile> _input_file;
+ // Map from column unique id to column ordinal in footer's ColumnMetaPB
+ // If we can't find unique id from it, it means this segment is created
+ // with an old schema.
+ std::unordered_map<uint32_t, uint32_t> _column_id_to_footer_ordinal;
+
// ColumnReader for each column in TabletSchema. If ColumnReader is
nullptr,
// This means that this segment has no data for that column, which may be
added
// after this segment is generated.
- std::vector<ColumnReader*> _column_readers;
+ std::vector<std::unique_ptr<ColumnReader>> _column_readers;
// used to guarantee that short key index will be loaded at most once in a
thread-safe way
DorisCallOnce<Status> _load_index_once;
@@ -133,11 +138,6 @@ private:
faststring _sk_index_buf;
// short key index decoder
std::unique_ptr<ShortKeyIndexDecoder> _sk_index_decoder;
-
- // Map from column unique id to column ordinal in footer's ColumnMetaPB
- // If we can't find unique id from it, it means this segment is created
- // with an old schema.
- std::unordered_map<uint32_t, uint32_t> _column_id_to_footer_ordinal;
};
}
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index ec5caa6..b74a958 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -170,8 +170,12 @@ Status
SegmentIterator::_get_row_ranges_from_zone_map(RowRanges* zone_map_row_ra
}
// get row ranges by zone map of this column
RowRanges column_zone_map_row_ranges;
-
_segment->_column_readers[cid]->get_row_ranges_by_zone_map(_opts.conditions->get_column(cid),
- column_delete_conditions[cid], _opts.stats,
&column_zone_map_row_ranges);
+ RETURN_IF_ERROR(
+ _segment->_column_readers[cid]->get_row_ranges_by_zone_map(
+ _opts.conditions->get_column(cid),
+ column_delete_conditions[cid],
+ _opts.stats,
+ &column_zone_map_row_ranges));
// intersection different columns's row ranges to get final row ranges
by zone map
RowRanges::ranges_intersection(origin_row_ranges,
column_zone_map_row_ranges, &origin_row_ranges);
}
diff --git a/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp
b/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp
index becc17d..97e8376 100644
--- a/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp
+++ b/be/test/olap/rowset/segment_v2/column_reader_writer_test.cpp
@@ -107,15 +107,12 @@ void test_nullable_data(uint8_t* src_data, uint8_t*
src_is_null, int num_rows, s
ASSERT_TRUE(st.ok());
ColumnReaderOptions reader_opts;
- ColumnReader reader(reader_opts, meta, num_rows, rfile.get());
-
- st = reader.init();
+ std::unique_ptr<ColumnReader> reader;
+ st = ColumnReader::create(reader_opts, meta, num_rows, rfile.get(),
&reader);
ASSERT_TRUE(st.ok());
- ASSERT_EQ(reader._ordinal_index->num_pages(),
reader._column_zone_map->get_column_zone_map().size());
-
ColumnIterator* iter = nullptr;
- st = reader.new_iterator(&iter);
+ st = reader->new_iterator(&iter);
ASSERT_TRUE(st.ok());
ColumnIteratorOptions iter_opts;
OlapReaderStatistics stats;
@@ -137,7 +134,7 @@ void test_nullable_data(uint8_t* src_data, uint8_t*
src_is_null, int num_rows, s
int idx = 0;
while (true) {
size_t rows_read = 1024;
- auto st = iter->next_batch(&rows_read, &col);
+ st = iter->next_batch(&rows_read, &col);
ASSERT_TRUE(st.ok());
for (int j = 0; j < rows_read; ++j) {
// LOG(INFO) << "is_null=" << is_null[j] << ",
src_is_null[]=" << src_is_null[idx]
diff --git a/be/test/olap/rowset/segment_v2/column_zone_map_test.cpp
b/be/test/olap/rowset/segment_v2/column_zone_map_test.cpp
index 98c3a3c..3a80e1c 100644
--- a/be/test/olap/rowset/segment_v2/column_zone_map_test.cpp
+++ b/be/test/olap/rowset/segment_v2/column_zone_map_test.cpp
@@ -29,13 +29,15 @@ public:
void test_string(Field* field) {
ColumnZoneMapBuilder builder(field);
std::vector<std::string> values1 = {"aaaa", "bbbb", "cccc", "dddd",
"eeee", "ffff"};
- for (auto value : values1) {
- builder.add((const uint8_t*)&value, 1);
+ for (auto& value : values1) {
+ Slice slice(value);
+ builder.add((const uint8_t*)&slice, 1);
}
builder.flush();
std::vector<std::string> values2 = {"aaaaa", "bbbbb", "ccccc",
"ddddd", "eeeee", "fffff"};
- for (auto value : values2) {
- builder.add((const uint8_t*)&value, 1);
+ for (auto& value : values2) {
+ Slice slice(value);
+ builder.add((const uint8_t*)&slice, 1);
}
builder.add(nullptr, 1);
builder.flush();
diff --git a/be/test/olap/tablet_schema_helper.h
b/be/test/olap/tablet_schema_helper.h
index d0a03b7..257415b 100644
--- a/be/test/olap/tablet_schema_helper.h
+++ b/be/test/olap/tablet_schema_helper.h
@@ -83,8 +83,9 @@ TabletColumn create_varchar_key(int32_t id, bool is_nullable
= true) {
void set_column_value_by_type(FieldType fieldType, int src, char* target,
MemPool* pool, size_t _length = 0) {
if (fieldType == OLAP_FIELD_TYPE_CHAR) {
- char* src_value = &std::to_string(src)[0];
- int src_len = strlen(src_value);
+ std::string s = std::to_string(src);
+ char* src_value = &s[0];
+ int src_len = s.size();
auto* dest_slice = (Slice*)target;
dest_slice->size = _length;
@@ -92,13 +93,14 @@ void set_column_value_by_type(FieldType fieldType, int src,
char* target, MemPoo
memcpy(dest_slice->data, src_value, src_len);
memset(dest_slice->data + src_len, 0, dest_slice->size - src_len);
} else if (fieldType == OLAP_FIELD_TYPE_VARCHAR) {
- char* src_value = &std::to_string(src)[0];
- int src_len = strlen(src_value);
+ std::string s = std::to_string(src);
+ char* src_value = &s[0];
+ int src_len = s.size();
auto* dest_slice = (Slice*)target;
dest_slice->size = src_len;
dest_slice->data = (char*)pool->allocate(src_len);
- std::memcpy(dest_slice->data, src_value, src_len);
+ memcpy(dest_slice->data, src_value, src_len);
} else {
*(int*)target = src;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]