This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new f80b856405e [enhancement](oom) return error when bloom filter allocate 
memory failed (#35790)
f80b856405e is described below

commit f80b856405e35d59a4d5d11cacc07f5885a16d7e
Author: yiguolei <[email protected]>
AuthorDate: Mon Jun 3 17:32:29 2024 +0800

    [enhancement](oom) return error when bloom filter allocate memory failed 
(#35790)
    
    ## Proposed changes
    
    
    1. return error when bloom filter allocate memory failed
    2. return error when deserialize a block,  it may need a lot of memory.
    
    ---------
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/olap/primary_key_index.cpp                           |  2 +-
 be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp | 13 ++++++++-----
 be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h   |  8 +++++---
 be/src/olap/rowset/segment_v2/column_writer.cpp             |  2 +-
 be/src/vec/core/block.cpp                                   |  5 ++++-
 .../segment_v2/bloom_filter_index_reader_writer_test.cpp    |  2 +-
 6 files changed, 20 insertions(+), 12 deletions(-)

diff --git a/be/src/olap/primary_key_index.cpp 
b/be/src/olap/primary_key_index.cpp
index 6ea4bc30d0a..d3554cae15d 100644
--- a/be/src/olap/primary_key_index.cpp
+++ b/be/src/olap/primary_key_index.cpp
@@ -57,7 +57,7 @@ Status PrimaryKeyIndexBuilder::init() {
 Status PrimaryKeyIndexBuilder::add_item(const Slice& key) {
     RETURN_IF_ERROR(_primary_key_index_builder->add(&key));
     Slice key_without_seq = Slice(key.get_data(), key.get_size() - 
_seq_col_length - _rowid_length);
-    _bloom_filter_index_builder->add_values(&key_without_seq, 1);
+    RETURN_IF_ERROR(_bloom_filter_index_builder->add_values(&key_without_seq, 
1));
     // the key is already sorted, so the first key is min_key, and
     // the last key is max_key.
     if (UNLIKELY(_num_rows == 0)) {
diff --git a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp 
b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp
index 27914280784..9581c170e7c 100644
--- a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp
@@ -79,13 +79,13 @@ public:
 
     ~BloomFilterIndexWriterImpl() override = default;
 
-    void add_values(const void* values, size_t count) override {
+    Status add_values(const void* values, size_t count) override {
         const CppType* v = (const CppType*)values;
         for (int i = 0; i < count; ++i) {
             if (_values.find(*v) == _values.end()) {
                 if constexpr (_is_slice_type()) {
                     CppType new_value;
-                    _type_info->deep_copy(&new_value, v, &_arena);
+                    
RETURN_IF_CATCH_EXCEPTION(_type_info->deep_copy(&new_value, v, &_arena));
                     _values.insert(new_value);
                 } else if constexpr (_is_int128()) {
                     int128_t new_value;
@@ -97,6 +97,7 @@ public:
             }
             ++v;
         }
+        return Status::OK();
     }
 
     void add_nulls(uint32_t count) override { _has_null = true; }
@@ -175,14 +176,15 @@ private:
 
 } // namespace
 
-void PrimaryKeyBloomFilterIndexWriterImpl::add_values(const void* values, 
size_t count) {
+Status PrimaryKeyBloomFilterIndexWriterImpl::add_values(const void* values, 
size_t count) {
     const Slice* v = (const Slice*)values;
     for (int i = 0; i < count; ++i) {
         Slice new_value;
-        _type_info->deep_copy(&new_value, v, &_arena);
+        RETURN_IF_CATCH_EXCEPTION(_type_info->deep_copy(&new_value, v, 
&_arena));
         _values.push_back(new_value);
         ++v;
     }
+    return Status::OK();
 }
 
 Status PrimaryKeyBloomFilterIndexWriterImpl::flush() {
@@ -247,7 +249,7 @@ 
NGramBloomFilterIndexWriterImpl::NGramBloomFilterIndexWriterImpl(
     static_cast<void>(BloomFilter::create(NGRAM_BLOOM_FILTER, &_bf, bf_size));
 }
 
-void NGramBloomFilterIndexWriterImpl::add_values(const void* values, size_t 
count) {
+Status NGramBloomFilterIndexWriterImpl::add_values(const void* values, size_t 
count) {
     const Slice* src = reinterpret_cast<const Slice*>(values);
     for (int i = 0; i < count; ++i, ++src) {
         if (src->size < _gram_size) {
@@ -255,6 +257,7 @@ void NGramBloomFilterIndexWriterImpl::add_values(const 
void* values, size_t coun
         }
         _token_extractor.string_to_bloom_filter(src->data, src->size, *_bf);
     }
+    return Status::OK();
 }
 
 Status NGramBloomFilterIndexWriterImpl::flush() {
diff --git a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h 
b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h
index df92f980c58..a393218f3f9 100644
--- a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h
+++ b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h
@@ -50,7 +50,7 @@ public:
     BloomFilterIndexWriter() = default;
     virtual ~BloomFilterIndexWriter() = default;
 
-    virtual void add_values(const void* values, size_t count) = 0;
+    virtual Status add_values(const void* values, size_t count) = 0;
 
     virtual void add_nulls(uint32_t count) = 0;
 
@@ -85,7 +85,9 @@ public:
         }
     };
 
-    void add_values(const void* values, size_t count) override;
+    // This method may allocate large memory for bf, will return error
+    // when memory is exhaused to prevent oom.
+    Status add_values(const void* values, size_t count) override;
 
     void add_nulls(uint32_t count) override { _has_null = true; }
 
@@ -114,7 +116,7 @@ public:
 
     NGramBloomFilterIndexWriterImpl(const BloomFilterOptions& bf_options, 
uint8_t gram_size,
                                     uint16_t bf_size);
-    void add_values(const void* values, size_t count) override;
+    Status add_values(const void* values, size_t count) override;
     void add_nulls(uint32_t) override {}
     Status flush() override;
     Status finish(io::FileWriter* file_writer, ColumnIndexMetaPB* index_meta) 
override;
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp 
b/be/src/olap/rowset/segment_v2/column_writer.cpp
index cbda176acd6..dee0d520d1f 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -579,7 +579,7 @@ Status 
ScalarColumnWriter::append_data_in_current_page(const uint8_t* data, size
                 _inverted_index_builder->add_values(get_field()->name(), data, 
*num_written));
     }
     if (_opts.need_bloom_filter) {
-        _bloom_filter_index_builder->add_values(data, *num_written);
+        RETURN_IF_ERROR(_bloom_filter_index_builder->add_values(data, 
*num_written));
     }
 
     _next_rowid += *num_written;
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index e6bedd6c78e..39f952c837a 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -100,6 +100,7 @@ Status Block::deserialize(const PBlock& pblock) {
             BlockCompressionCodec* codec;
             
RETURN_IF_ERROR(get_block_compression_codec(pblock.compression_type(), &codec));
             uncompressed_size = pblock.uncompressed_size();
+            // Should also use allocator to allocate memory here.
             compression_scratch.resize(uncompressed_size);
             Slice decompressed_slice(compression_scratch);
             RETURN_IF_ERROR(codec->decompress(Slice(compressed_data, 
compressed_size),
@@ -123,7 +124,9 @@ Status Block::deserialize(const PBlock& pblock) {
     for (const auto& pcol_meta : pblock.column_metas()) {
         DataTypePtr type = 
DataTypeFactory::instance().create_data_type(pcol_meta);
         MutableColumnPtr data_column = type->create_column();
-        buf = type->deserialize(buf, data_column.get(), 
pblock.be_exec_version());
+        // Here will try to allocate large memory, should return error if 
failed.
+        RETURN_IF_CATCH_EXCEPTION(
+                buf = type->deserialize(buf, data_column.get(), 
pblock.be_exec_version()));
         data.emplace_back(data_column->get_ptr(), type, pcol_meta.name());
     }
     initialize_index_by_name();
diff --git 
a/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp 
b/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp
index 83f7473f4fe..258dd9a5ff8 100644
--- a/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp
+++ b/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp
@@ -78,7 +78,7 @@ void write_bloom_filter_index_file(const std::string& 
file_name, const void* val
         const CppType* vals = (const CppType*)values;
         for (int i = 0; i < value_count;) {
             size_t num = std::min(1024, (int)value_count - i);
-            bloom_filter_index_writer->add_values(vals + i, num);
+            static_cast<void>(bloom_filter_index_writer->add_values(vals + i, 
num));
             if (i == 2048) {
                 // second page
                 bloom_filter_index_writer->add_nulls(null_count);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to