This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new ee64ab5 Fix segment size (#2549)
ee64ab5 is described below
commit ee64ab55db5c6eeb5a5e98f846ec9827af7d22ee
Author: kangpinghuang <[email protected]>
AuthorDate: Thu Dec 26 11:51:53 2019 +0800
Fix segment size (#2549)
---
be/src/olap/rowset/segment_v2/column_writer.cpp | 87 ++++++++++++++--------
be/src/olap/rowset/segment_v2/column_writer.h | 9 ++-
be/src/olap/rowset/segment_v2/page_compression.cpp | 28 +++++++
be/src/olap/rowset/segment_v2/page_compression.h | 6 ++
run-ut.sh | 1 -
5 files changed, 99 insertions(+), 32 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp
b/be/src/olap/rowset/segment_v2/column_writer.cpp
index 4608d6a..8bb5614 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -211,9 +211,8 @@ uint64_t ColumnWriter::estimate_buffer_size() {
uint64_t size = 0;
Page* page = _pages.head;
while (page != nullptr) {
- size += page->data.slice().size;
- if (_is_nullable) {
- size += page->null_bitmap.slice().get_size();
+ for (auto& data_slice : page->data) {
+ size += data_slice.slice().size;
}
page = page->next;
}
@@ -250,7 +249,7 @@ Status ColumnWriter::write_data() {
_page_builder->get_dictionary_page(&dict_page);
std::vector<Slice> origin_data;
origin_data.push_back(dict_page.slice());
- RETURN_IF_ERROR(_write_physical_page(&origin_data, &_dict_page_pp));
+ RETURN_IF_ERROR(_compress_and_write_page(&origin_data,
&_dict_page_pp));
}
return Status::OK();
}
@@ -258,14 +257,15 @@ Status ColumnWriter::write_data() {
Status ColumnWriter::write_ordinal_index() {
Slice data = _ordinal_index_builder->finish();
std::vector<Slice> slices{data};
- return _write_physical_page(&slices, &_ordinal_index_pp);
+ auto st = _compress_and_write_page(&slices, &_ordinal_index_pp);
+ return st;
}
Status ColumnWriter::write_zone_map() {
if (_opts.need_zone_map) {
OwnedSlice data = _column_zone_map_builder->finish();
std::vector<Slice> slices{data.slice()};
- return _write_physical_page(&slices, &_zone_map_pp);
+ RETURN_IF_ERROR(_compress_and_write_page(&slices, &_zone_map_pp));
}
return Status::OK();
}
@@ -311,30 +311,17 @@ void ColumnWriter::write_meta(ColumnMetaPB* meta) {
// write a page into file and update ordinal index
// this function will call _write_physical_page to write data
Status ColumnWriter::_write_data_page(Page* page) {
+ PagePointer pp;
std::vector<Slice> origin_data;
- faststring header;
- // 1. first rowid
- put_varint32(&header, page->first_rowid);
- // 2. row count
- put_varint32(&header, page->num_rows);
- if (_is_nullable) {
- put_varint32(&header, page->null_bitmap.slice().get_size());
+ for (auto& data : page->data) {
+ origin_data.push_back(data.slice());
}
- origin_data.emplace_back(header.data(), header.size());
- if (_is_nullable) {
- origin_data.push_back(page->null_bitmap.slice());
- }
- origin_data.push_back(page->data.slice());
- // TODO(zc): upadte page's statistics
-
- PagePointer pp;
RETURN_IF_ERROR(_write_physical_page(&origin_data, &pp));
_ordinal_index_builder->append_entry(page->first_rowid, pp);
return Status::OK();
}
-// write a physical page in to files
-Status ColumnWriter::_write_physical_page(std::vector<Slice>* origin_data,
PagePointer* pp) {
+Status ColumnWriter::_compress_and_write_page(std::vector<Slice>* origin_data,
PagePointer* pp) {
std::vector<Slice>* output_data = origin_data;
std::vector<Slice> compressed_data;
@@ -345,18 +332,22 @@ Status
ColumnWriter::_write_physical_page(std::vector<Slice>* origin_data, PageP
RETURN_IF_ERROR(compressor.compress(*origin_data, &compressed_data));
output_data = &compressed_data;
}
+ return _write_physical_page(output_data, pp);
+}
+// write a physical page in to files
+Status ColumnWriter::_write_physical_page(std::vector<Slice>* origin_data,
PagePointer* pp) {
// checksum
uint8_t checksum_buf[sizeof(uint32_t)];
- uint32_t checksum = crc32c::Value(*output_data);
+ uint32_t checksum = crc32c::Value(*origin_data);
encode_fixed32_le(checksum_buf, checksum);
- output_data->emplace_back(checksum_buf, sizeof(uint32_t));
+ origin_data->emplace_back(checksum_buf, sizeof(uint32_t));
// remember the offset
pp->offset = _output_file->size();
// write content to file
size_t bytes_written = 0;
- RETURN_IF_ERROR(_write_raw_data(*output_data, &bytes_written));
+ RETURN_IF_ERROR(_write_raw_data(*origin_data, &bytes_written));
pp->size = bytes_written;
return Status::OK();
@@ -382,12 +373,50 @@ Status ColumnWriter::_finish_current_page() {
Page* page = new Page();
page->first_rowid = _last_first_rowid;
page->num_rows = _next_rowid - _last_first_rowid;
- page->data = _page_builder->finish();
- _page_builder->reset();
+ faststring header;
+ // 1. first rowid
+ put_varint32(&header, page->first_rowid);
+ // 2. row count
+ put_varint32(&header, page->num_rows);
+ OwnedSlice null_bitmap;
if (_is_nullable) {
- page->null_bitmap = _null_bitmap_builder->finish();
+ null_bitmap = _null_bitmap_builder->finish();
_null_bitmap_builder->reset();
+ put_varint32(&header, null_bitmap.slice().get_size());
+ }
+ page->data.emplace_back(std::move(header.build()));
+
+ if (_is_nullable) {
+ page->data.emplace_back(std::move(null_bitmap));
}
+ OwnedSlice data_slice = _page_builder->finish();
+ _page_builder->reset();
+ page->data.emplace_back(std::move(data_slice));
+
+ // compressed data
+ if (_compress_codec != nullptr) {
+ PageCompressor compressor(_compress_codec);
+ std::vector<Slice> data_slices;
+ size_t origin_size = 0;
+ for (auto& data : page->data) {
+ data_slices.push_back(data.slice());
+ origin_size += data.slice().size;
+ }
+ OwnedSlice compressed_data;
+ bool compressed = false;
+ RETURN_IF_ERROR(compressor.compress(data_slices, &compressed_data,
&compressed));
+ if (compressed) {
+ page->data.clear();
+ page->data.emplace_back(std::move(compressed_data));
+ } else {
+ size_t uncompressed_bytes = Slice::compute_total_size(data_slices);
+ faststring buf;
+ buf.resize(4);
+ encode_fixed32_le((uint8_t*)buf.data(), uncompressed_bytes);
+ page->data.emplace_back(std::move(buf.build()));
+ }
+ }
+
// update last first rowid
_last_first_rowid = _next_rowid;
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h
b/be/src/olap/rowset/segment_v2/column_writer.h
index 92ea5da..3521f98 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -108,8 +108,12 @@ private:
struct Page {
int32_t first_rowid;
int32_t num_rows;
- OwnedSlice null_bitmap;
- OwnedSlice data;
+ // the data vector may contain:
+ // 1. one OwnedSlice if the data is compressed
+ // 2. one OwnedSlice if the data is not compressed and is not
nullable
+ // 3. two OwnedSlice if the data is not compressed and is nullable
+ // use vector for easier management for lifetime of OwnedSlice
+ std::vector<OwnedSlice> data;
Page* next = nullptr;
};
@@ -134,6 +138,7 @@ private:
Status _write_raw_data(const std::vector<Slice>& data, size_t*
bytes_written);
Status _write_data_page(Page* page);
+ Status _compress_and_write_page(std::vector<Slice>* origin_data,
PagePointer* pp);
Status _write_physical_page(std::vector<Slice>* origin_data, PagePointer*
pp);
private:
diff --git a/be/src/olap/rowset/segment_v2/page_compression.cpp
b/be/src/olap/rowset/segment_v2/page_compression.cpp
index e0121b2..c85b2cd 100644
--- a/be/src/olap/rowset/segment_v2/page_compression.cpp
+++ b/be/src/olap/rowset/segment_v2/page_compression.cpp
@@ -88,5 +88,33 @@ Status PageCompressor::compress(const std::vector<Slice>&
raw_data,
return Status::OK();
}
+Status PageCompressor::compress(const std::vector<Slice>& raw_data,
+ OwnedSlice* compressed_data, bool* compressed)
{
+ size_t uncompressed_bytes = Slice::compute_total_size(raw_data);
+ size_t max_compressed_bytes =
_codec->max_compressed_len(uncompressed_bytes);
+ _buf.resize(max_compressed_bytes + 4);
+ Slice compression_buffer(_buf.data(), max_compressed_bytes);
+ RETURN_IF_ERROR(_codec->compress(raw_data, &compression_buffer));
+
+ double space_saving = 1.0 - (double)compression_buffer.size /
uncompressed_bytes;
+ if (compression_buffer.size >= uncompressed_bytes || // use integer to
make definite
+ space_saving < _min_space_saving) {
+ // If space saving is not higher enough we just copy uncompressed
+ // data to avoid decompression CPU cost
+ _buf.resize(0);
+ *compressed_data = _buf.build();
+ *compressed = false;
+ return Status::OK();
+ }
+ // encode uncompressed_bytes into footer of compressed value
+ encode_fixed32_le((uint8_t*)_buf.data() + compression_buffer.size,
uncompressed_bytes);
+ // return compressed data to client
+ _buf.resize(compression_buffer.size + 4);
+ *compressed_data = _buf.build();
+ *compressed = true;
+
+ return Status::OK();
+}
+
}
}
diff --git a/be/src/olap/rowset/segment_v2/page_compression.h
b/be/src/olap/rowset/segment_v2/page_compression.h
index d88e0f3..4e78fa3 100644
--- a/be/src/olap/rowset/segment_v2/page_compression.h
+++ b/be/src/olap/rowset/segment_v2/page_compression.h
@@ -83,6 +83,12 @@ public:
// smaller enough than raw data, this class will return uncompressed data.
Status compress(const std::vector<Slice>& raw_data,
std::vector<Slice>* compressed_data);
+
+ // Try to compress input raw data into compressed page by returning
OwnedSlice
+ // according given BlockCompressionCodec. If compressed page is not
+ // smaller enough than raw data, this class will return uncompressed data.
+ Status compress(const std::vector<Slice>& raw_data,
+ OwnedSlice* compressed_data, bool* compressed);
private:
const BlockCompressionCodec* _codec;
diff --git a/run-ut.sh b/run-ut.sh
index ecb09fc..66fc095 100755
--- a/run-ut.sh
+++ b/run-ut.sh
@@ -280,7 +280,6 @@
${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/column_zone_map_test
${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/row_ranges_test
${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/frame_of_reference_page_test
${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/block_bloom_filter_test
-${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/bloom_filter_page_test
${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test
${DORIS_TEST_BINARY_DIR}/olap/txn_manager_test
${DORIS_TEST_BINARY_DIR}/olap/storage_types_test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]