This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ee64ab5  Fix segment size (#2549)
ee64ab5 is described below

commit ee64ab55db5c6eeb5a5e98f846ec9827af7d22ee
Author: kangpinghuang <[email protected]>
AuthorDate: Thu Dec 26 11:51:53 2019 +0800

    Fix segment size (#2549)
---
 be/src/olap/rowset/segment_v2/column_writer.cpp    | 87 ++++++++++++++--------
 be/src/olap/rowset/segment_v2/column_writer.h      |  9 ++-
 be/src/olap/rowset/segment_v2/page_compression.cpp | 28 +++++++
 be/src/olap/rowset/segment_v2/page_compression.h   |  6 ++
 run-ut.sh                                          |  1 -
 5 files changed, 99 insertions(+), 32 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp 
b/be/src/olap/rowset/segment_v2/column_writer.cpp
index 4608d6a..8bb5614 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -211,9 +211,8 @@ uint64_t ColumnWriter::estimate_buffer_size() {
     uint64_t size = 0;
     Page* page = _pages.head;
     while (page != nullptr) {
-        size += page->data.slice().size;
-        if (_is_nullable) {
-            size += page->null_bitmap.slice().get_size();
+        for (auto& data_slice : page->data) {
+            size += data_slice.slice().size;
         }
         page = page->next;
     }
@@ -250,7 +249,7 @@ Status ColumnWriter::write_data() {
         _page_builder->get_dictionary_page(&dict_page);
         std::vector<Slice> origin_data;
         origin_data.push_back(dict_page.slice());
-        RETURN_IF_ERROR(_write_physical_page(&origin_data, &_dict_page_pp));
+        RETURN_IF_ERROR(_compress_and_write_page(&origin_data, 
&_dict_page_pp));
     }
     return Status::OK();
 }
@@ -258,14 +257,15 @@ Status ColumnWriter::write_data() {
 Status ColumnWriter::write_ordinal_index() {
     Slice data = _ordinal_index_builder->finish();
     std::vector<Slice> slices{data};
-    return _write_physical_page(&slices, &_ordinal_index_pp);
+    auto st = _compress_and_write_page(&slices, &_ordinal_index_pp);
+    return st;
 }
 
 Status ColumnWriter::write_zone_map() {
     if (_opts.need_zone_map) {
         OwnedSlice data = _column_zone_map_builder->finish();
         std::vector<Slice> slices{data.slice()};
-        return _write_physical_page(&slices, &_zone_map_pp);
+        RETURN_IF_ERROR(_compress_and_write_page(&slices, &_zone_map_pp));
     }
     return Status::OK();
 }
@@ -311,30 +311,17 @@ void ColumnWriter::write_meta(ColumnMetaPB* meta) {
 // write a page into file and update ordinal index
 // this function will call _write_physical_page to write data
 Status ColumnWriter::_write_data_page(Page* page) {
+    PagePointer pp;
     std::vector<Slice> origin_data;
-    faststring header;
-    // 1. first rowid
-    put_varint32(&header, page->first_rowid);
-    // 2. row count
-    put_varint32(&header, page->num_rows);
-    if (_is_nullable) {
-        put_varint32(&header, page->null_bitmap.slice().get_size());
+    for (auto& data : page->data) {
+        origin_data.push_back(data.slice());
     }
-    origin_data.emplace_back(header.data(), header.size());
-    if (_is_nullable) {
-        origin_data.push_back(page->null_bitmap.slice());
-    }
-    origin_data.push_back(page->data.slice());
-    // TODO(zc): upadte page's statistics
-
-    PagePointer pp;
     RETURN_IF_ERROR(_write_physical_page(&origin_data, &pp));
     _ordinal_index_builder->append_entry(page->first_rowid, pp);
     return Status::OK();
 }
 
-// write a physical page in to files
-Status ColumnWriter::_write_physical_page(std::vector<Slice>* origin_data, 
PagePointer* pp) {
+Status ColumnWriter::_compress_and_write_page(std::vector<Slice>* origin_data, 
PagePointer* pp) {
     std::vector<Slice>* output_data = origin_data;
     std::vector<Slice> compressed_data;
 
@@ -345,18 +332,22 @@ Status 
ColumnWriter::_write_physical_page(std::vector<Slice>* origin_data, PageP
         RETURN_IF_ERROR(compressor.compress(*origin_data, &compressed_data));
         output_data = &compressed_data;
     }
+    return _write_physical_page(output_data, pp);
+}
 
+// write a physical page in to files
+Status ColumnWriter::_write_physical_page(std::vector<Slice>* origin_data, 
PagePointer* pp) {
     // checksum
     uint8_t checksum_buf[sizeof(uint32_t)];
-    uint32_t checksum = crc32c::Value(*output_data);
+    uint32_t checksum = crc32c::Value(*origin_data);
     encode_fixed32_le(checksum_buf, checksum);
-    output_data->emplace_back(checksum_buf, sizeof(uint32_t));
+    origin_data->emplace_back(checksum_buf, sizeof(uint32_t));
 
     // remember the offset
     pp->offset = _output_file->size();
     // write content to file
     size_t bytes_written = 0;
-    RETURN_IF_ERROR(_write_raw_data(*output_data, &bytes_written));
+    RETURN_IF_ERROR(_write_raw_data(*origin_data, &bytes_written));
     pp->size = bytes_written;
 
     return Status::OK();
@@ -382,12 +373,50 @@ Status ColumnWriter::_finish_current_page() {
     Page* page = new Page();
     page->first_rowid = _last_first_rowid;
     page->num_rows = _next_rowid - _last_first_rowid;
-    page->data = _page_builder->finish();
-    _page_builder->reset();
+    faststring header;
+    // 1. first rowid
+    put_varint32(&header, page->first_rowid);
+    // 2. row count
+    put_varint32(&header, page->num_rows);
+    OwnedSlice null_bitmap;
     if (_is_nullable) {
-        page->null_bitmap = _null_bitmap_builder->finish();
+        null_bitmap = _null_bitmap_builder->finish();
         _null_bitmap_builder->reset();
+        put_varint32(&header, null_bitmap.slice().get_size());
+    }
+    page->data.emplace_back(std::move(header.build()));
+
+    if (_is_nullable) {
+        page->data.emplace_back(std::move(null_bitmap));
     }
+    OwnedSlice data_slice = _page_builder->finish();
+    _page_builder->reset();
+    page->data.emplace_back(std::move(data_slice));
+
+    // compressed data
+    if (_compress_codec != nullptr) {
+        PageCompressor compressor(_compress_codec);
+        std::vector<Slice> data_slices;
+        size_t origin_size = 0;
+        for (auto& data : page->data) {
+            data_slices.push_back(data.slice());
+            origin_size += data.slice().size;
+        }
+        OwnedSlice compressed_data;
+        bool compressed = false;
+        RETURN_IF_ERROR(compressor.compress(data_slices, &compressed_data, 
&compressed));
+        if (compressed) {
+            page->data.clear();
+            page->data.emplace_back(std::move(compressed_data));
+        } else {
+            size_t uncompressed_bytes = Slice::compute_total_size(data_slices);
+            faststring buf;
+            buf.resize(4);
+            encode_fixed32_le((uint8_t*)buf.data(), uncompressed_bytes);
+            page->data.emplace_back(std::move(buf.build()));
+        }
+    }
+
     // update last first rowid
     _last_first_rowid = _next_rowid;
 
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h 
b/be/src/olap/rowset/segment_v2/column_writer.h
index 92ea5da..3521f98 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -108,8 +108,12 @@ private:
     struct Page {
         int32_t first_rowid;
         int32_t num_rows;
-        OwnedSlice null_bitmap;
-        OwnedSlice data;
+        // the data vector may contain:
+        //     1. one OwnedSlice if the data is compressed
+        //     2. one OwnedSlice if the data is not compressed and is not 
nullable
+        //     3. two OwnedSlice if the data is not compressed and is nullable
+        // use vector for easier management for lifetime of OwnedSlice
+        std::vector<OwnedSlice> data;
         Page* next = nullptr;
     };
 
@@ -134,6 +138,7 @@ private:
     Status _write_raw_data(const std::vector<Slice>& data, size_t* 
bytes_written);
 
     Status _write_data_page(Page* page);
+    Status _compress_and_write_page(std::vector<Slice>* origin_data, 
PagePointer* pp);
     Status _write_physical_page(std::vector<Slice>* origin_data, PagePointer* 
pp);
 
 private:
diff --git a/be/src/olap/rowset/segment_v2/page_compression.cpp 
b/be/src/olap/rowset/segment_v2/page_compression.cpp
index e0121b2..c85b2cd 100644
--- a/be/src/olap/rowset/segment_v2/page_compression.cpp
+++ b/be/src/olap/rowset/segment_v2/page_compression.cpp
@@ -88,5 +88,33 @@ Status PageCompressor::compress(const std::vector<Slice>& 
raw_data,
     return Status::OK();
 }
 
+Status PageCompressor::compress(const std::vector<Slice>& raw_data,
+                                OwnedSlice* compressed_data, bool* compressed) 
{
+    size_t uncompressed_bytes = Slice::compute_total_size(raw_data);
+    size_t max_compressed_bytes = 
_codec->max_compressed_len(uncompressed_bytes);
+    _buf.resize(max_compressed_bytes + 4);
+    Slice compression_buffer(_buf.data(), max_compressed_bytes);
+    RETURN_IF_ERROR(_codec->compress(raw_data, &compression_buffer));
+
+    double space_saving = 1.0 - (double)compression_buffer.size / 
uncompressed_bytes;
+    if (compression_buffer.size >= uncompressed_bytes || // use integer to 
make definite
+            space_saving < _min_space_saving) {
+        // If space saving is not higher enough we just copy uncompressed
+        // data to avoid decompression CPU cost
+        _buf.resize(0);
+        *compressed_data = _buf.build();
+        *compressed = false;
+        return Status::OK();
+    }
+    // encode uncompressed_bytes into footer of compressed value
+    encode_fixed32_le((uint8_t*)_buf.data() + compression_buffer.size, 
uncompressed_bytes);
+    // return compressed data to client
+    _buf.resize(compression_buffer.size + 4);
+    *compressed_data = _buf.build();
+    *compressed = true;
+
+    return Status::OK();
+}
+
 }
 }
diff --git a/be/src/olap/rowset/segment_v2/page_compression.h 
b/be/src/olap/rowset/segment_v2/page_compression.h
index d88e0f3..4e78fa3 100644
--- a/be/src/olap/rowset/segment_v2/page_compression.h
+++ b/be/src/olap/rowset/segment_v2/page_compression.h
@@ -83,6 +83,12 @@ public:
     // smaller enough than raw data, this class will return uncompressed data.
     Status compress(const std::vector<Slice>& raw_data,
                     std::vector<Slice>* compressed_data);
+
+    // Try to compress input raw data into compressed page by returning 
OwnedSlice
+    // according given BlockCompressionCodec. If compressed page is not
+    // smaller enough than raw data, this class will return uncompressed data.
+    Status compress(const std::vector<Slice>& raw_data,
+                    OwnedSlice* compressed_data, bool* compressed);
 private:
     const BlockCompressionCodec* _codec;
 
diff --git a/run-ut.sh b/run-ut.sh
index ecb09fc..66fc095 100755
--- a/run-ut.sh
+++ b/run-ut.sh
@@ -280,7 +280,6 @@ 
${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/column_zone_map_test
 ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/row_ranges_test
 ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/frame_of_reference_page_test
 ${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/block_bloom_filter_test
-${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/bloom_filter_page_test
 
${DORIS_TEST_BINARY_DIR}/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test
 ${DORIS_TEST_BINARY_DIR}/olap/txn_manager_test
 ${DORIS_TEST_BINARY_DIR}/olap/storage_types_test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to