This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 4746e9e3a23 [opt](inverted index)Optimize code to get rid of heap use
after free (#45745) (#46075)
4746e9e3a23 is described below
commit 4746e9e3a23cf7835c60465e1bfdb95cc72abc8b
Author: qiye <[email protected]>
AuthorDate: Fri Dec 27 16:46:58 2024 +0800
[opt](inverted index)Optimize code to get rid of heap use after free
(#45745) (#46075)
bp #45745
---
.../segment_v2/inverted_index_compound_reader.cpp | 16 ++
.../segment_v2/inverted_index_compound_reader.h | 1 +
.../segment_v2/inverted_index_fs_directory.cpp | 4 +-
.../rowset/segment_v2/inverted_index_writer.cpp | 12 +-
...st_index_compound_directory_fault_injection.out | 22 ---
...index_compound_directory_fault_injection.groovy | 197 ++++++++++++++-------
6 files changed, 156 insertions(+), 96 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
index 5e6d8747a2d..08ab1b6cc88 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
@@ -214,6 +214,9 @@ const char* DorisCompoundReader::getObjectName() const {
}
bool DorisCompoundReader::list(std::vector<std::string>* names) const {
+ if (_closed || entries == nullptr) {
+ _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
+ }
for (EntriesType::const_iterator i = entries->begin(); i !=
entries->end(); i++) {
names->push_back(i->first);
}
@@ -221,6 +224,9 @@ bool DorisCompoundReader::list(std::vector<std::string>*
names) const {
}
bool DorisCompoundReader::fileExists(const char* name) const {
+ if (_closed || entries == nullptr) {
+ _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
+ }
return entries->exists((char*)name);
}
@@ -237,6 +243,9 @@ int64_t DorisCompoundReader::fileModified(const char* name)
const {
}
int64_t DorisCompoundReader::fileLength(const char* name) const {
+ if (_closed || entries == nullptr) {
+ _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
+ }
ReaderFileEntry* e = entries->get((char*)name);
if (e == nullptr) {
char buf[CL_MAX_PATH + 30];
@@ -251,6 +260,9 @@ int64_t DorisCompoundReader::fileLength(const char* name)
const {
bool DorisCompoundReader::openInput(const char* name,
std::unique_ptr<lucene::store::IndexInput>& ret,
CLuceneError& error, int32_t bufferSize) {
+ if (_closed || entries == nullptr) {
+ _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
+ }
lucene::store::IndexInput* tmp;
bool success = openInput(name, tmp, error, bufferSize);
if (success) {
@@ -294,6 +306,10 @@ void DorisCompoundReader::close() {
_CLDELETE(stream)
}
if (entries != nullptr) {
+ // The life cycle of _entries should be consistent with that of the
DorisCompoundReader.
+ // DO NOT DELETE _entries here, it will be deleted in the destructor
+ // When directory is closed, all _entries are cleared. But the
directory may be called in other places.
+ // If we delete the _entries object here, it will cause core dump.
entries->clear();
}
if (ram_dir) {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
index 1ca2d6ad371..bc5ae415052 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h
@@ -72,6 +72,7 @@ private:
std::string directory;
std::string file_name;
CL_NS(store)::IndexInput* stream = nullptr;
+ // The life cycle of entries should be consistent with that of the
DorisCompoundReader.
EntriesType* entries = nullptr;
std::mutex _this_lock;
bool _closed = false;
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index 9dbe0986755..9e2e253c404 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -634,10 +634,8 @@ bool DorisRAMFSDirectory::doDeleteFile(const char* name) {
SCOPED_LOCK_MUTEX(this->THIS_LOCK);
sizeInBytes -= itr->second->sizeInBytes;
filesMap->removeitr(itr);
- return true;
- } else {
- return false;
}
+ return true;
}
bool DorisRAMFSDirectory::deleteDirectory() {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
index a50b34b5fb1..4e503685e68 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
@@ -138,12 +138,14 @@ public:
void close_on_error() override {
try {
- if (_index_writer) {
- _index_writer->close();
- }
+ // delete directory must be done before index_writer close
+ // because index_writer will close the directory
if (_dir) {
_dir->deleteDirectory();
}
+ if (_index_writer) {
+ _index_writer->close();
+ }
} catch (CLuceneError& e) {
LOG(ERROR) << "InvertedIndexWriter close_on_error failure: " <<
e.what();
}
@@ -664,11 +666,13 @@ private:
std::unique_ptr<lucene::document::Document> _doc = nullptr;
lucene::document::Field* _field = nullptr;
bool _single_field = true;
+ // Since _index_writer's write.lock is created by _dir.lockFactory,
+ // _dir must destruct after _index_writer, so _dir must be defined before
_index_writer.
+ DorisFSDirectory* _dir = nullptr;
std::unique_ptr<lucene::index::IndexWriter> _index_writer = nullptr;
std::unique_ptr<lucene::analysis::Analyzer> _analyzer = nullptr;
std::unique_ptr<lucene::util::Reader> _char_string_reader = nullptr;
std::shared_ptr<lucene::util::bkd::bkd_writer> _bkd_writer = nullptr;
- DorisFSDirectory* _dir = nullptr;
const KeyCoder* _value_key_coder;
const TabletIndex* _index_meta;
InvertedIndexParserType _parser_type;
diff --git
a/regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out
b/regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out
deleted file mode 100644
index 4efc0928fb7..00000000000
---
a/regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out
+++ /dev/null
@@ -1,22 +0,0 @@
--- This file is automatically generated. You should know what you did if you
want to edit this
--- !sql --
-863
-
--- !sql --
-863
-
--- !sql --
-863
-
--- !sql --
-863
-
--- !sql --
-0
-
--- !sql --
-0
-
--- !sql --
-0
-
diff --git
a/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
index 759c409a850..97cd829ec79 100644
---
a/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy
@@ -15,9 +15,35 @@
// specific language governing permissions and limitations
// under the License.
-suite("test_index_compound_directory_failure_injection", "nonConcurrent") {
+suite("test_index_compound_directory_fault_injection", "nonConcurrent") {
// define a sql table
def testTable_dup = "httplogs_dup_compound"
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def set_be_config = { key, value ->
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ",
err=" + err)
+ }
+ }
+
+ def check_config = { String key, String value ->
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+ logger.info("Show config: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == key) {
+ assertEquals(value, ((List<String>) ele)[2])
+ }
+ }
+ }
+ }
def create_httplogs_dup_table = {testTablex ->
// multi-line sql
@@ -85,74 +111,111 @@ suite("test_index_compound_directory_failure_injection",
"nonConcurrent") {
}
}
- try {
- sql "DROP TABLE IF EXISTS ${testTable_dup}"
- create_httplogs_dup_table.call(testTable_dup)
-
- try {
-
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
- load_httplogs_data.call(testTable_dup,
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
- } finally {
-
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
- }
- qt_sql "select COUNT() from ${testTable_dup} where request match
'images'"
- try {
-
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
- load_httplogs_data.call(testTable_dup,
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
- } finally {
-
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
- }
- qt_sql "select COUNT() from ${testTable_dup} where request match
'images'"
- try {
-
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error")
- load_httplogs_data.call(testTable_dup,
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
- } finally {
-
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error")
- }
- qt_sql "select COUNT() from ${testTable_dup} where request match
'images'"
- try {
-
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
- load_httplogs_data.call(testTable_dup,
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
- } finally {
-
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
- }
- qt_sql "select COUNT() from ${testTable_dup} where request match
'images'"
- try {
- def test_index_compound_directory =
"test_index_compound_directory1"
- sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
- create_httplogs_dup_table.call(test_index_compound_directory)
-
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
- load_httplogs_data.call(test_index_compound_directory,
test_index_compound_directory, 'true', 'json', 'documents-1000.json')
- qt_sql "select COUNT() from ${test_index_compound_directory} where
request match 'gif'"
- try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
- } catch(Exception ex) {
- logger.info("_mock_append_data_error_in_fsindexoutput_flushBuffer,
result: " + ex)
- } finally {
-
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
- }
- try {
- def test_index_compound_directory =
"test_index_compound_directory2"
- sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
- create_httplogs_dup_table.call(test_index_compound_directory)
-
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
- load_httplogs_data.call(test_index_compound_directory,
test_index_compound_directory, 'true', 'json', 'documents-1000.json')
- qt_sql "select COUNT() from ${test_index_compound_directory} where
request match 'images'"
- try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
- } finally {
-
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
- }
+ def run_test = {String is_enable ->
+ boolean invertedIndexRAMDirEnable = false
+ boolean has_update_be_config = false
try {
- def test_index_compound_directory =
"test_index_compound_directory3"
- sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
- create_httplogs_dup_table.call(test_index_compound_directory)
-
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
- load_httplogs_data.call(test_index_compound_directory,
test_index_compound_directory, 'true', 'json', 'documents-1000.json')
- qt_sql "select COUNT() from ${test_index_compound_directory} where
request match 'png'"
- try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
+ String backend_id;
+ backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+
+ logger.info("Show config: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] ==
"inverted_index_ram_dir_enable") {
+ invertedIndexRAMDirEnable =
Boolean.parseBoolean(((List<String>) ele)[2])
+ logger.info("inverted_index_ram_dir_enable:
${((List<String>) ele)[2]}")
+ }
+ }
+ set_be_config.call("inverted_index_ram_dir_enable", is_enable)
+ has_update_be_config = true
+ // check updated config
+ check_config.call("inverted_index_ram_dir_enable", is_enable);
+
+ sql "DROP TABLE IF EXISTS ${testTable_dup}"
+ create_httplogs_dup_table.call(testTable_dup)
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
+ load_httplogs_data.call(testTable_dup,
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
+ }
+ def res = sql "select COUNT() from ${testTable_dup} where request
match 'images'"
+ assertEquals(863, res[0][0])
+ sql "TRUNCATE TABLE ${testTable_dup}"
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
+ load_httplogs_data.call(testTable_dup,
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
+ }
+ res = sql "select COUNT() from ${testTable_dup} where request
match 'images'"
+ assertEquals(0, res[0][0])
+ sql "TRUNCATE TABLE ${testTable_dup}"
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
+ load_httplogs_data.call(testTable_dup,
'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
+ }
+ res = sql "select COUNT() from ${testTable_dup} where request
match 'images'"
+ assertEquals(0, res[0][0])
+ sql "TRUNCATE TABLE ${testTable_dup}"
+
+ try {
+ def test_index_compound_directory =
"test_index_compound_directory1"
+ sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
+ create_httplogs_dup_table.call(test_index_compound_directory)
+
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
+ load_httplogs_data.call(test_index_compound_directory,
test_index_compound_directory, 'true', 'json', 'documents-1000.json')
+ res = sql "select COUNT() from
${test_index_compound_directory} where request match 'gif'"
+ try_sql("DROP TABLE IF EXISTS
${test_index_compound_directory}")
+ } catch(Exception ex) {
+ assertTrue(ex.toString().contains("failed to initialize
storage reader"))
+
logger.info("_mock_append_data_error_in_fsindexoutput_flushBuffer, result: " +
ex)
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
+ }
+
+ try {
+ def test_index_compound_directory =
"test_index_compound_directory2"
+ sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
+ create_httplogs_dup_table.call(test_index_compound_directory)
+
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
+ load_httplogs_data.call(test_index_compound_directory,
test_index_compound_directory, 'true', 'json', 'documents-1000.json')
+ res = sql "select COUNT() from
${test_index_compound_directory} where request match 'images'"
+ assertEquals(0, res[0][0])
+ try_sql("DROP TABLE IF EXISTS
${test_index_compound_directory}")
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
+ }
+
+ try {
+ def test_index_compound_directory =
"test_index_compound_directory3"
+ sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
+ create_httplogs_dup_table.call(test_index_compound_directory)
+
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
+ load_httplogs_data.call(test_index_compound_directory,
test_index_compound_directory, 'true', 'json', 'documents-1000.json')
+ res = sql "select COUNT() from
${test_index_compound_directory} where request match 'png'"
+ assertEquals(0, res[0][0])
+ try_sql("DROP TABLE IF EXISTS
${test_index_compound_directory}")
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
+ }
} finally {
-
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
+ if (has_update_be_config) {
+ set_be_config.call("inverted_index_ram_dir_enable",
invertedIndexRAMDirEnable.toString())
+ }
}
- } finally {
- //try_sql("DROP TABLE IF EXISTS ${testTable}")
}
+
+ run_test.call("false")
+ run_test.call("true")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]