This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new ee9c0256ddd [Fix](inverted index) fix memory leak in inverted index 
when encountering fault (#29676) (#29782)
ee9c0256ddd is described below

commit ee9c0256ddd341351935867be407b6c2a08fb994
Author: airborne12 <[email protected]>
AuthorDate: Wed Jan 10 18:22:12 2024 +0800

    [Fix](inverted index) fix memory leak in inverted index when encountering 
fault (#29676) (#29782)
---
 be/src/clucene                                     |  2 +-
 .../inverted_index_compound_directory.cpp          | 21 ++++++++++-----
 .../segment_v2/inverted_index_compound_directory.h |  4 +++
 .../rowset/segment_v2/inverted_index_writer.cpp    | 26 +++++++++++++++++-
 .../olap/rowset/segment_v2/inverted_index_writer.h |  2 ++
 ...st_index_compound_directory_fault_injection.out |  3 +++
 ...index_compound_directory_fault_injection.groovy | 31 +++++++++++++++++-----
 7 files changed, 75 insertions(+), 14 deletions(-)

diff --git a/be/src/clucene b/be/src/clucene
index c9030853082..ab319e51dfa 160000
--- a/be/src/clucene
+++ b/be/src/clucene
@@ -1 +1 @@
-Subproject commit c90308530828a24fe421a9e19bc1e5e06f1460cd
+Subproject commit ab319e51dfabdf85e6ba33f8d1754cacff8c5cd8
diff --git 
a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
index 0796102e908..1af26a57674 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp
@@ -75,9 +75,8 @@
     }
 namespace doris::segment_v2 {
 
-const char* WRITE_LOCK_FILE = "write.lock";
-const char* COMPOUND_FILE_EXTENSION = ".idx";
-const int64_t MAX_HEADER_DATA_SIZE = 1024 * 128; // 128k
+const char* const DorisCompoundDirectory::WRITE_LOCK_FILE = "write.lock";
+const char* const DorisCompoundDirectory::COMPOUND_FILE_EXTENSION = ".idx";
 
 DorisCompoundFileWriter::DorisCompoundFileWriter(CL_NS(store)::Directory* dir) 
{
     if (dir == nullptr) {
@@ -96,7 +95,7 @@ void DorisCompoundFileWriter::writeCompoundFile() {
     std::vector<std::string> files;
     directory->list(&files);
     // remove write.lock file
-    auto it = std::find(files.begin(), files.end(), WRITE_LOCK_FILE);
+    auto it = std::find(files.begin(), files.end(), 
DorisCompoundDirectory::WRITE_LOCK_FILE);
     if (it != files.end()) {
         files.erase(it);
     }
@@ -114,7 +113,8 @@ void DorisCompoundFileWriter::writeCompoundFile() {
 
     io::Path cfs_path(((DorisCompoundDirectory*)directory)->getCfsDirName());
     auto idx_path = cfs_path.parent_path();
-    std::string idx_name = std::string(cfs_path.stem().c_str()) + 
COMPOUND_FILE_EXTENSION;
+    std::string idx_name =
+            std::string(cfs_path.stem().c_str()) + 
DorisCompoundDirectory::COMPOUND_FILE_EXTENSION;
     // write file entries to ram directory to get header length
     lucene::store::RAMDirectory ram_dir;
     auto* out_idx = ram_dir.createOutput(idx_name.c_str());
@@ -136,7 +136,7 @@ void DorisCompoundFileWriter::writeCompoundFile() {
         ram_output->writeLong(0);            // data offset
         ram_output->writeLong(file.second);  // file length
         header_file_length += file.second;
-        if (header_file_length <= MAX_HEADER_DATA_SIZE) {
+        if (header_file_length <= 
DorisCompoundDirectory::MAX_HEADER_DATA_SIZE) {
             copyFile(file.first.c_str(), ram_output.get(), ram_buffer, 
buffer_length);
             header_file_count++;
         }
@@ -375,6 +375,13 @@ void 
DorisCompoundDirectory::FSIndexInput::readInternal(uint8_t* b, const int32_
 void DorisCompoundDirectory::FSIndexOutput::init(const io::FileSystemSPtr& 
fileSystem,
                                                  const char* path) {
     Status status = fileSystem->create_file(path, &_writer);
+    DBUG_EXECUTE_IF(
+            
"DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_"
+            "init",
+            {
+                status = 
Status::Error<doris::ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
+                        "debug point: test throw error in fsindexoutput init 
mock error");
+            })
     if (!status.ok()) {
         _writer.reset(nullptr);
         auto err = "Create compound file error: " + status.to_string();
@@ -671,6 +678,8 @@ lucene::store::IndexOutput* 
DorisCompoundDirectory::createOutput(const char* nam
     try {
         ret->init(fs, fl);
     } catch (CLuceneError& err) {
+        ret->close();
+        _CLDELETE(ret)
         LOG(WARNING) << "FSIndexOutput init error: " << err.what();
         _CLTHROWA(CL_ERR_IO, "FSIndexOutput init error");
     }
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h 
b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
index 7ae0e618a45..94f8b3ccc79 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h
@@ -55,6 +55,10 @@ private:
 };
 
 class CLUCENE_EXPORT DorisCompoundDirectory : public lucene::store::Directory {
+public:
+    static const char* const WRITE_LOCK_FILE;
+    static const char* const COMPOUND_FILE_EXTENSION;
+    static const int64_t MAX_HEADER_DATA_SIZE = 1024 * 128; // 128k
 private:
     int filemode;
 
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp 
b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
index d82f6f6b556..9d7a047c606 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
@@ -114,6 +114,24 @@ public:
         }
     }
 
+    void close_on_error() override {
+        try {
+            if (_index_writer) {
+                _index_writer->close();
+            }
+            if (_dir) {
+                _dir->deleteDirectory();
+                io::Path cfs_path(_dir->getCfsDirName());
+                auto idx_path = cfs_path.parent_path();
+                std::string idx_name = std::string(cfs_path.stem().c_str()) +
+                                       
DorisCompoundDirectory::COMPOUND_FILE_EXTENSION;
+                _dir->deleteFile(idx_name.c_str());
+            }
+        } catch (CLuceneError& e) {
+            LOG(ERROR) << "InvertedIndexWriter close_on_error failure: " << 
e.what();
+        }
+    }
+
     Status init_bkd_index() {
         size_t value_length = sizeof(CppType);
         // NOTE: initialize with 0, set to max_row_id when finished.
@@ -223,6 +241,7 @@ public:
         try {
             _index_writer->addDocument(_doc.get());
         } catch (const CLuceneError& e) {
+            close_on_error();
             _dir->deleteDirectory();
             return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
                     "CLuceneError add_document: {}", e.what());
@@ -234,6 +253,7 @@ public:
         try {
             _index_writer->addNullDocument(_doc.get());
         } catch (const CLuceneError& e) {
+            close_on_error();
             _dir->deleteDirectory();
             return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
                     "CLuceneError add_null_document: {}", e.what());
@@ -692,7 +712,11 @@ Status InvertedIndexColumnWriter::create(const Field* 
field,
                                     std::to_string(int(type)));
     }
     if (*res != nullptr) {
-        RETURN_IF_ERROR((*res)->init());
+        auto st = (*res)->init();
+        if (!st.ok()) {
+            (*res)->close_on_error();
+            return st;
+        }
     }
     return Status::OK();
 }
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.h 
b/be/src/olap/rowset/segment_v2/inverted_index_writer.h
index f9bce406408..44cc41789b5 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.h
@@ -60,6 +60,8 @@ public:
 
     virtual int64_t file_size() const = 0;
 
+    virtual void close_on_error() = 0;
+
 private:
     DISALLOW_COPY_AND_ASSIGN(InvertedIndexColumnWriter);
 };
diff --git 
a/regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out
 
b/regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out
index bf59811cc65..5082f2d1578 100644
--- 
a/regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out
+++ 
b/regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out
@@ -17,3 +17,6 @@
 -- !sql --
 0
 
+-- !sql --
+0
+
diff --git 
a/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
index 1c6f969dd2f..dfd056c978d 100644
--- 
a/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
@@ -118,21 +118,40 @@ suite("test_index_compound_directory_failure_injection", 
"nonConcurrent") {
         }
         qt_sql "select COUNT() from ${testTable_dup} where request match 
'images'"
         try {
-            create_httplogs_dup_table.call("test_index_compound_directory1")
+            def test_index_compound_directory = 
"test_index_compound_directory1"
+            sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
+            create_httplogs_dup_table.call(test_index_compound_directory)
             
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
-            load_httplogs_data.call("test_index_compound_directory1", 
'test_index_compound_directory1', 'true', 'json', 'documents-1000.json')
+            load_httplogs_data.call(test_index_compound_directory, 
test_index_compound_directory, 'true', 'json', 'documents-1000.json')
+            qt_sql "select COUNT() from ${test_index_compound_directory} where 
request match 'gif'"
+            try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
+        } catch(Exception ex) {
+            logger.info("_mock_append_data_error_in_fsindexoutput_flushBuffer, 
 result: " + ex)
         } finally {
             
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
         }
-        qt_sql "select COUNT() from test_index_compound_directory1 where 
request match 'images'"
         try {
-            create_httplogs_dup_table.call("test_index_compound_directory2")
+            def test_index_compound_directory = 
"test_index_compound_directory2"
+            sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
+            create_httplogs_dup_table.call(test_index_compound_directory)
             
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
-            load_httplogs_data.call("test_index_compound_directory2", 
'test_index_compound_directory2', 'true', 'json', 'documents-1000.json')
+            load_httplogs_data.call(test_index_compound_directory, 
test_index_compound_directory, 'true', 'json', 'documents-1000.json')
+            qt_sql "select COUNT() from ${test_index_compound_directory} where 
request match 'images'"
+            try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
         } finally {
             
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
         }
-        qt_sql "select COUNT() from test_index_compound_directory2 where 
request match 'images'"
+        try {
+            def test_index_compound_directory = 
"test_index_compound_directory3"
+            sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
+            create_httplogs_dup_table.call(test_index_compound_directory)
+            
GetDebugPoint().enableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
+            load_httplogs_data.call(test_index_compound_directory, 
test_index_compound_directory, 'true', 'json', 'documents-1000.json')
+            qt_sql "select COUNT() from ${test_index_compound_directory} where 
request match 'png'"
+            try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("DorisCompoundDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
+        }
     } finally {
         //try_sql("DROP TABLE IF EXISTS ${testTable}")
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to