This is an automated email from the ASF dual-hosted git repository.
jianliangqi pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c6b15ff4eb4 [test](inverted index)Add fault injection cases for index
writing (#39649) (#43931)
c6b15ff4eb4 is described below
commit c6b15ff4eb4a75a62343600fa8343341a5420650
Author: qiye <[email protected]>
AuthorDate: Thu Nov 14 16:08:46 2024 +0800
[test](inverted index)Add fault injection cases for index writing (#39649)
(#43931)
bp #39649
---
be/src/olap/compaction.cpp | 56 +++-
be/src/olap/inverted_index_parser.cpp | 1 +
be/src/olap/olap_server.cpp | 3 +
.../char_filter/char_filter_factory.h | 1 +
.../segment_v2/inverted_index_compaction.cpp | 16 +-
.../segment_v2/inverted_index_file_writer.cpp | 209 +++++++------
.../rowset/segment_v2/inverted_index_file_writer.h | 4 +-
.../segment_v2/inverted_index_fs_directory.cpp | 182 +++++++++--
.../rowset/segment_v2/inverted_index_writer.cpp | 95 +++++-
be/src/olap/task/index_builder.cpp | 87 +++++-
...dex_compaction_exception_fault_injection.groovy | 341 +++++++++++++++++++++
...inverted_index_exception_fault_injection.groovy | 301 ++++++++++++++++++
...st_build_index_exception_fault_injection.groovy | 263 ++++++++++++++++
13 files changed, 1417 insertions(+), 142 deletions(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 2420154e013..a92fe28abf5 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -510,6 +510,8 @@ Status Compaction::do_inverted_index_compaction() {
auto src_segment_num = src_seg_to_id_map.size();
auto dest_segment_num = dest_segment_num_rows.size();
+
DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_dest_segment_num_is_zero",
+ { dest_segment_num = 0; })
if (dest_segment_num <= 0) {
LOG(INFO) << "skip doing index compaction due to no output segments"
<< ". tablet=" << _tablet->tablet_id() << ", input row
number=" << _input_row_num
@@ -584,14 +586,17 @@ Status Compaction::do_inverted_index_compaction() {
const auto& [rowset_id, seg_id] = m.first;
auto find_it = rs_id_to_rowset_map.find(rowset_id);
+
DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_find_rowset_error",
+ { find_it = rs_id_to_rowset_map.end(); })
if (find_it == rs_id_to_rowset_map.end()) [[unlikely]] {
- DCHECK(false) << _tablet->tablet_id() << ' ' << rowset_id;
+ // DCHECK(false) << _tablet->tablet_id() << ' ' << rowset_id;
return Status::InternalError("cannot find rowset. tablet_id={}
rowset_id={}",
_tablet->tablet_id(),
rowset_id.to_string());
}
auto* rowset = find_it->second;
- const auto& fs = rowset->rowset_meta()->fs();
+ auto fs = rowset->rowset_meta()->fs();
+
DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_get_fs_error", { fs =
nullptr; })
if (!fs) {
return Status::InternalError("get fs failed, resource_id={}",
rowset->rowset_meta()->resource_id());
@@ -639,6 +644,13 @@ Status Compaction::do_inverted_index_compaction() {
for (auto&& column_uniq_id : ctx.columns_to_do_index_compaction) {
auto col = _cur_tablet_schema->column_by_uid(column_uniq_id);
const auto* index_meta = _cur_tablet_schema->inverted_index(col);
+
DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_can_not_find_index_meta",
+ { index_meta = nullptr; })
+ if (index_meta == nullptr) {
+ status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(
+ fmt::format("Can not find index_meta for col {}",
col.name()));
+ break;
+ }
std::vector<lucene::store::Directory*>
dest_index_dirs(dest_segment_num);
try {
@@ -701,10 +713,13 @@ void
Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
is_continue = true;
break;
}
- const auto& properties = tablet_index->properties();
+ auto properties = tablet_index->properties();
if (!first_properties.has_value()) {
first_properties = properties;
} else {
+ DBUG_EXECUTE_IF(
+
"Compaction::do_inverted_index_compaction_index_properties_different",
+ { properties.emplace("dummy_key", "dummy_value"); })
if (properties != first_properties.value()) {
is_continue = true;
break;
@@ -716,6 +731,8 @@ void
Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
}
auto has_inverted_index = [&](const RowsetSharedPtr& src_rs) {
auto* rowset = static_cast<BetaRowset*>(src_rs.get());
+
DBUG_EXECUTE_IF("Compaction::construct_skip_inverted_index_is_skip_index_compaction",
+ {
rowset->set_skip_index_compaction(col_unique_id); })
if (rowset->is_skip_index_compaction(col_unique_id)) {
LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "]
rowset["
<< rowset->rowset_id() << "] column_unique_id["
<< col_unique_id
@@ -723,7 +740,9 @@ void
Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
return false;
}
- const auto& fs = rowset->rowset_meta()->fs();
+ auto fs = rowset->rowset_meta()->fs();
+
DBUG_EXECUTE_IF("Compaction::construct_skip_inverted_index_get_fs_error",
+ { fs = nullptr; })
if (!fs) {
LOG(WARNING) << "get fs failed, resource_id="
<< rowset->rowset_meta()->resource_id();
@@ -731,6 +750,8 @@ void
Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
}
const auto* index_meta =
rowset->tablet_schema()->inverted_index(col_unique_id);
+
DBUG_EXECUTE_IF("Compaction::construct_skip_inverted_index_index_meta_nullptr",
+ { index_meta = nullptr; })
if (index_meta == nullptr) {
LOG(WARNING) << "tablet[" << _tablet->tablet_id() << "]
column_unique_id["
<< col_unique_id << "] index meta is null, will
skip index compaction";
@@ -740,6 +761,9 @@ void
Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
for (auto i = 0; i < rowset->num_segments(); i++) {
// TODO: inverted_index_path
auto seg_path = rowset->segment_path(i);
+
DBUG_EXECUTE_IF("Compaction::construct_skip_inverted_index_seg_path_nullptr", {
+ seg_path =
ResultError(Status::Error<ErrorCode::INTERNAL_ERROR>("error"));
+ })
if (!seg_path) {
LOG(WARNING) << seg_path.error();
return false;
@@ -757,6 +781,16 @@ void
Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
auto st = inverted_index_file_reader->init(
config::inverted_index_read_buffer_size,
open_idx_file_cache);
index_file_path =
inverted_index_file_reader->get_index_file_path(index_meta);
+ DBUG_EXECUTE_IF(
+
"Compaction::construct_skip_inverted_index_index_file_reader_init_"
+ "status_not_ok",
+ {
+ st = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point: "
+
"construct_skip_inverted_index_index_file_reader_init_"
+ "status_"
+ "not_ok");
+ })
if (!st.ok()) {
LOG(WARNING) << "init index " << index_file_path << "
error:" << st;
return false;
@@ -764,6 +798,14 @@ void
Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
// check index meta
auto result = inverted_index_file_reader->open(index_meta);
+ DBUG_EXECUTE_IF(
+
"Compaction::construct_skip_inverted_index_index_file_reader_open_"
+ "error",
+ {
+ result = ResultError(
+
Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
+ "CLuceneError occur when open
idx file"));
+ })
if (!result.has_value()) {
LOG(WARNING)
<< "open index " << index_file_path << "
error:" << result.error();
@@ -773,6 +815,12 @@ void
Compaction::construct_index_compaction_columns(RowsetWriterContext& ctx) {
std::vector<std::string> files;
reader->list(&files);
reader->close();
+ DBUG_EXECUTE_IF(
+
"Compaction::construct_skip_inverted_index_index_reader_close_error",
+ { _CLTHROWA(CL_ERR_IO, "debug point: reader close
error"); })
+
+
DBUG_EXECUTE_IF("Compaction::construct_skip_inverted_index_index_files_count",
+ { files.clear(); })
// why is 3?
// slice type index file at least has 3 files:
null_bitmap, segments_N, segments.gen
diff --git a/be/src/olap/inverted_index_parser.cpp
b/be/src/olap/inverted_index_parser.cpp
index a9ed7ec062e..f7e511970d9 100644
--- a/be/src/olap/inverted_index_parser.cpp
+++ b/be/src/olap/inverted_index_parser.cpp
@@ -128,6 +128,7 @@ std::string get_parser_ignore_above_value_from_properties(
std::string get_parser_stopwords_from_properties(
const std::map<std::string, std::string>& properties) {
+
DBUG_EXECUTE_IF("inverted_index_parser.get_parser_stopwords_from_properties", {
return ""; })
if (properties.find(INVERTED_INDEX_PARSER_STOPWORDS_KEY) !=
properties.end()) {
return properties.at(INVERTED_INDEX_PARSER_STOPWORDS_KEY);
} else {
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 96dc9295834..a0c5a05636b 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -78,6 +78,7 @@
#include "runtime/memory/cache_manager.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "util/countdown_latch.h"
+#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/thread.h"
@@ -1134,6 +1135,8 @@ Status
StorageEngine::submit_seg_compaction_task(std::shared_ptr<SegcompactionWo
Status StorageEngine::process_index_change_task(const TAlterInvertedIndexReq&
request) {
auto tablet_id = request.tablet_id;
TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
+ DBUG_EXECUTE_IF("StorageEngine::process_index_change_task_tablet_nullptr",
+ { tablet = nullptr; })
if (tablet == nullptr) {
LOG(WARNING) << "tablet: " << tablet_id << " not exist";
return Status::InternalError("tablet not exist, tablet_id={}.",
tablet_id);
diff --git
a/be/src/olap/rowset/segment_v2/inverted_index/char_filter/char_filter_factory.h
b/be/src/olap/rowset/segment_v2/inverted_index/char_filter/char_filter_factory.h
index 561054863d7..bebbea58f72 100644
---
a/be/src/olap/rowset/segment_v2/inverted_index/char_filter/char_filter_factory.h
+++
b/be/src/olap/rowset/segment_v2/inverted_index/char_filter/char_filter_factory.h
@@ -27,6 +27,7 @@ class CharFilterFactory {
public:
template <typename... Args>
static lucene::analysis::CharFilter* create(const std::string& name,
Args&&... args) {
+ DBUG_EXECUTE_IF("CharFilterFactory::create_return_nullptr", { return
nullptr; })
if (name == INVERTED_INDEX_CHAR_FILTER_CHAR_REPLACE) {
return new CharReplaceCharFilter(std::forward<Args>(args)...);
}
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
index 7d1b348b95b..88a8f241722 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_compaction.cpp
@@ -44,10 +44,16 @@ Status compact_column(int64_t index_id,
bool can_use_ram_dir = true;
lucene::store::Directory* dir = DorisFSDirectoryFactory::getDirectory(
io::global_local_filesystem(), tmp_path.data(), can_use_ram_dir);
+ DBUG_EXECUTE_IF("compact_column_getDirectory_error", {
+ _CLTHROWA(CL_ERR_IO, "debug point: compact_column_getDirectory_error
in index compaction");
+ })
lucene::analysis::SimpleAnalyzer<char> analyzer;
auto* index_writer = _CLNEW lucene::index::IndexWriter(dir, &analyzer,
true /* create */,
true /*
closeDirOnShutdown */);
-
+ DBUG_EXECUTE_IF("compact_column_create_index_writer_error", {
+ _CLTHROWA(CL_ERR_IO,
+ "debug point: compact_column_create_index_writer_error in
index compaction");
+ })
DCHECK_EQ(src_index_dirs.size(), trans_vec.size());
std::vector<lucene::store::Directory*>
tmp_src_index_dirs(src_index_dirs.size());
for (size_t i = 0; i < tmp_src_index_dirs.size(); ++i) {
@@ -55,8 +61,16 @@ Status compact_column(int64_t index_id,
}
index_writer->indexCompaction(tmp_src_index_dirs, dest_index_dirs,
trans_vec,
dest_segment_num_rows);
+ DBUG_EXECUTE_IF("compact_column_indexCompaction_error", {
+ _CLTHROWA(CL_ERR_IO,
+ "debug point: compact_column_indexCompaction_error in index
compaction");
+ })
index_writer->close();
+ DBUG_EXECUTE_IF("compact_column_index_writer_close_error", {
+ _CLTHROWA(CL_ERR_IO,
+ "debug point: compact_column_index_writer_close_error in
index compaction");
+ })
_CLDELETE(index_writer);
// NOTE: need to ref_cnt-- for dir,
// when index_writer is destroyed, if closeDir is set, dir will be close
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
index 70c1e55d1e8..0e2dbe7d6bd 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.cpp
@@ -17,6 +17,8 @@
#include "olap/rowset/segment_v2/inverted_index_file_writer.h"
+#include <glog/logging.h>
+
#include <filesystem>
#include "common/status.h"
@@ -44,11 +46,13 @@ Result<DorisFSDirectory*>
InvertedIndexFileWriter::open(const TabletIndex* index
index_meta->get_index_suffix());
bool exists = false;
auto st = local_fs->exists(local_fs_index_path, &exists);
+ DBUG_EXECUTE_IF("InvertedIndexFileWriter::open_local_fs_exists_error",
+ { st = Status::Error<ErrorCode::IO_ERROR>("debug point: no
such file error"); })
if (!st.ok()) {
LOG(ERROR) << "index_path:" << local_fs_index_path << " exists error:"
<< st;
return ResultError(st);
}
-
+ DBUG_EXECUTE_IF("InvertedIndexFileWriter::open_local_fs_exists_true", {
exists = true; })
if (exists) {
LOG(ERROR) << "try to init a directory:" << local_fs_index_path << "
already exists";
return ResultError(
@@ -75,6 +79,8 @@ Result<DorisFSDirectory*> InvertedIndexFileWriter::open(const
TabletIndex* index
}
Status InvertedIndexFileWriter::delete_index(const TabletIndex* index_meta) {
+ DBUG_EXECUTE_IF("InvertedIndexFileWriter::delete_index_index_meta_nullptr",
+ { index_meta = nullptr; });
if (!index_meta) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>("Index metadata is
null.");
}
@@ -84,6 +90,8 @@ Status InvertedIndexFileWriter::delete_index(const
TabletIndex* index_meta) {
// Check if the specified index exists
auto index_it = _indices_dirs.find(std::make_pair(index_id, index_suffix));
+
DBUG_EXECUTE_IF("InvertedIndexFileWriter::delete_index_indices_dirs_reach_end",
+ { index_it = _indices_dirs.end(); })
if (index_it == _indices_dirs.end()) {
std::ostringstream errMsg;
errMsg << "No inverted index with id " << index_id << " and suffix "
<< index_suffix
@@ -136,7 +144,7 @@ Status InvertedIndexFileWriter::close() {
})
if (_storage_format == InvertedIndexStorageFormatPB::V1) {
try {
- _total_file_size = write_v1();
+ RETURN_IF_ERROR(write_v1());
for (const auto& entry : _indices_dirs) {
const auto& dir = entry.second;
// delete index path, which contains separated inverted index
files
@@ -151,7 +159,7 @@ Status InvertedIndexFileWriter::close() {
}
} else {
try {
- _total_file_size = write_v2();
+ RETURN_IF_ERROR(write_v2());
for (const auto& entry : _indices_dirs) {
const auto& dir = entry.second;
// delete index path, which contains separated inverted index
files
@@ -198,7 +206,12 @@ void InvertedIndexFileWriter::copyFile(const char*
fileName, lucene::store::Dire
int64_t bufferLength) {
lucene::store::IndexInput* tmp = nullptr;
CLuceneError err;
- if (!dir->openInput(fileName, tmp, err)) {
+ auto open = dir->openInput(fileName, tmp, err);
+ DBUG_EXECUTE_IF("InvertedIndexFileWriter::copyFile_openInput_error", {
+ open = false;
+ err.set(CL_ERR_IO, "debug point: copyFile_openInput_error");
+ });
+ if (!open) {
throw err;
}
@@ -214,6 +227,7 @@ void InvertedIndexFileWriter::copyFile(const char*
fileName, lucene::store::Dire
output->writeBytes(buffer, len);
remainder -= len;
}
+ DBUG_EXECUTE_IF("InvertedIndexFileWriter::copyFile_remainder_is_not_zero",
{ remainder = 10; });
if (remainder != 0) {
std::ostringstream errMsg;
errMsg << "Non-zero remainder length after copying: " << remainder <<
" (id: " << fileName
@@ -224,6 +238,8 @@ void InvertedIndexFileWriter::copyFile(const char*
fileName, lucene::store::Dire
int64_t end_ptr = output->getFilePointer();
int64_t diff = end_ptr - start_ptr;
+ DBUG_EXECUTE_IF("InvertedIndexFileWriter::copyFile_diff_not_equals_length",
+ { diff = length - 10; });
if (diff != length) {
std::ostringstream errMsg;
errMsg << "Difference in the output file offsets " << diff
@@ -234,7 +250,7 @@ void InvertedIndexFileWriter::copyFile(const char*
fileName, lucene::store::Dire
input->close();
}
-int64_t InvertedIndexFileWriter::write_v1() {
+Status InvertedIndexFileWriter::write_v1() {
int64_t total_size = 0;
for (const auto& entry : _indices_dirs) {
const int64_t index_id = entry.first.first;
@@ -267,6 +283,8 @@ int64_t InvertedIndexFileWriter::write_v1() {
// write file entries to ram directory to get header length
lucene::store::RAMDirectory ram_dir;
auto* out_idx = ram_dir.createOutput(idx_name.c_str());
+
DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_ram_output_is_nullptr",
+ { out_idx = nullptr; })
if (out_idx == nullptr) {
LOG(WARNING) << "Write compound file error: RAMDirectory
output is nullptr.";
_CLTHROWA(CL_ERR_IO, "Create RAMDirectory output error");
@@ -300,6 +318,8 @@ int64_t InvertedIndexFileWriter::write_v1() {
out_dir->set_file_writer_opts(_opts);
auto* out = out_dir->createOutput(idx_name.c_str());
+
DBUG_EXECUTE_IF("InvertedIndexFileWriter::write_v1_out_dir_createOutput_nullptr",
+ { out = nullptr; });
if (out == nullptr) {
LOG(WARNING) << "Write compound file error: CompoundDirectory
output is nullptr.";
_CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error");
@@ -351,106 +371,123 @@ int64_t InvertedIndexFileWriter::write_v1() {
auto* new_index_info = _file_info.add_index_info();
*new_index_info = index_info;
} catch (CLuceneError& err) {
- LOG(ERROR) << "CLuceneError occur when close idx file "
- <<
InvertedIndexDescriptor::get_index_file_path_v1(_index_path_prefix,
-
index_id, index_suffix)
+ auto index_path = InvertedIndexDescriptor::get_index_file_path_v1(
+ _index_path_prefix, index_id, index_suffix);
+ LOG(ERROR) << "CLuceneError occur when write_v1 idx file " <<
index_path
<< " error msg: " << err.what();
- throw err;
+ return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
+ "CLuceneError occur when write_v1 idx file: {}, error msg:
{}", index_path,
+ err.what());
}
}
- return total_size;
+ _total_file_size = total_size;
+ return Status::OK();
}
-int64_t InvertedIndexFileWriter::write_v2() {
- // Create the output stream to write the compound file
- int64_t current_offset = headerLength();
-
+Status InvertedIndexFileWriter::write_v2() {
io::Path index_path
{InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)};
+ std::unique_ptr<lucene::store::IndexOutput> compound_file_output;
+ try {
+ // Create the output stream to write the compound file
+ int64_t current_offset = headerLength();
- auto* out_dir = DorisFSDirectoryFactory::getDirectory(_fs,
index_path.parent_path().c_str());
- out_dir->set_file_writer_opts(_opts);
+ io::Path index_path
{InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)};
- std::unique_ptr<lucene::store::IndexOutput> compound_file_output;
+ auto* out_dir =
+ DorisFSDirectoryFactory::getDirectory(_fs,
index_path.parent_path().c_str());
+ out_dir->set_file_writer_opts(_opts);
- DCHECK(_idx_v2_writer != nullptr) << "inverted index file writer v2 is
nullptr";
- compound_file_output = std::unique_ptr<lucene::store::IndexOutput>(
- out_dir->createOutputV2(_idx_v2_writer.get()));
+ DCHECK(_idx_v2_writer != nullptr) << "inverted index file writer v2 is
nullptr";
+ compound_file_output = std::unique_ptr<lucene::store::IndexOutput>(
+ out_dir->createOutputV2(_idx_v2_writer.get()));
- // Write the version number
- compound_file_output->writeInt(InvertedIndexStorageFormatPB::V2);
+ // Write the version number
+ compound_file_output->writeInt(InvertedIndexStorageFormatPB::V2);
- // Write the number of indices
- const auto numIndices = static_cast<uint32_t>(_indices_dirs.size());
- compound_file_output->writeInt(numIndices);
+ // Write the number of indices
+ const auto numIndices = static_cast<uint32_t>(_indices_dirs.size());
+ compound_file_output->writeInt(numIndices);
- std::vector<std::tuple<std::string, int64_t, int64_t,
CL_NS(store)::Directory*>>
- file_metadata; // Store file name, offset, file length, and
corresponding directory
+ std::vector<std::tuple<std::string, int64_t, int64_t,
CL_NS(store)::Directory*>>
+ file_metadata; // Store file name, offset, file length, and
corresponding directory
- // First, write all index information and file metadata
- for (const auto& entry : _indices_dirs) {
- const int64_t index_id = entry.first.first;
- const auto& index_suffix = entry.first.second;
- const auto& dir = entry.second;
- std::vector<std::string> files;
- dir->list(&files);
+ // First, write all index information and file metadata
+ for (const auto& entry : _indices_dirs) {
+ const int64_t index_id = entry.first.first;
+ const auto& index_suffix = entry.first.second;
+ const auto& dir = entry.second;
+ std::vector<std::string> files;
+ dir->list(&files);
- auto it = std::find(files.begin(), files.end(),
DorisFSDirectory::WRITE_LOCK_FILE);
- if (it != files.end()) {
- files.erase(it);
- }
- // sort file list by file length
- std::vector<std::pair<std::string, int64_t>> sorted_files;
- for (const auto& file : files) {
- sorted_files.emplace_back(file, dir->fileLength(file.c_str()));
- }
+ auto it = std::find(files.begin(), files.end(),
DorisFSDirectory::WRITE_LOCK_FILE);
+ if (it != files.end()) {
+ files.erase(it);
+ }
+ // sort file list by file length
+ std::vector<std::pair<std::string, int64_t>> sorted_files;
+ for (const auto& file : files) {
+ sorted_files.emplace_back(file, dir->fileLength(file.c_str()));
+ }
- std::sort(sorted_files.begin(), sorted_files.end(),
- [](const std::pair<std::string, int64_t>& a,
- const std::pair<std::string, int64_t>& b) { return
(a.second < b.second); });
-
- int32_t file_count = sorted_files.size();
-
- // Write the index ID and the number of files
- compound_file_output->writeLong(index_id);
-
compound_file_output->writeInt(static_cast<int32_t>(index_suffix.length()));
- compound_file_output->writeBytes(reinterpret_cast<const
uint8_t*>(index_suffix.data()),
- index_suffix.length());
- compound_file_output->writeInt(file_count);
-
- // Calculate the offset for each file and write the file metadata
- for (const auto& file : sorted_files) {
- int64_t file_length = dir->fileLength(file.first.c_str());
-
compound_file_output->writeInt(static_cast<int32_t>(file.first.length()));
- compound_file_output->writeBytes(reinterpret_cast<const
uint8_t*>(file.first.data()),
- file.first.length());
- compound_file_output->writeLong(current_offset);
- compound_file_output->writeLong(file_length);
-
- file_metadata.emplace_back(file.first, current_offset,
file_length, dir.get());
- current_offset += file_length; // Update the data offset
+ std::sort(
+ sorted_files.begin(), sorted_files.end(),
+ [](const std::pair<std::string, int64_t>& a,
+ const std::pair<std::string, int64_t>& b) { return
(a.second < b.second); });
+
+ int32_t file_count = sorted_files.size();
+
+ // Write the index ID and the number of files
+ compound_file_output->writeLong(index_id);
+
compound_file_output->writeInt(static_cast<int32_t>(index_suffix.length()));
+ compound_file_output->writeBytes(reinterpret_cast<const
uint8_t*>(index_suffix.data()),
+ index_suffix.length());
+ compound_file_output->writeInt(file_count);
+
+ // Calculate the offset for each file and write the file metadata
+ for (const auto& file : sorted_files) {
+ int64_t file_length = dir->fileLength(file.first.c_str());
+
compound_file_output->writeInt(static_cast<int32_t>(file.first.length()));
+ compound_file_output->writeBytes(
+ reinterpret_cast<const uint8_t*>(file.first.data()),
file.first.length());
+ compound_file_output->writeLong(current_offset);
+ compound_file_output->writeLong(file_length);
+
+ file_metadata.emplace_back(file.first, current_offset,
file_length, dir.get());
+ current_offset += file_length; // Update the data offset
+ }
}
- }
- const int64_t buffer_length = 16384;
- uint8_t header_buffer[buffer_length];
+ const int64_t buffer_length = 16384;
+ uint8_t header_buffer[buffer_length];
- // Next, write the file data
- for (const auto& info : file_metadata) {
- const std::string& file = std::get<0>(info);
- auto* dir = std::get<3>(info);
+ // Next, write the file data
+ for (const auto& info : file_metadata) {
+ const std::string& file = std::get<0>(info);
+ auto* dir = std::get<3>(info);
- // Write the actual file data
- copyFile(file.c_str(), dir, compound_file_output.get(), header_buffer,
buffer_length);
- }
+ // Write the actual file data
+ copyFile(file.c_str(), dir, compound_file_output.get(),
header_buffer, buffer_length);
+ }
- out_dir->close();
- // NOTE: need to decrease ref count, but not to delete here,
- // because index cache may get the same directory from DIRECTORIES
- _CLDECDELETE(out_dir)
- auto compound_file_size = compound_file_output->getFilePointer();
- compound_file_output->close();
- _file_info.set_index_size(compound_file_size);
- return compound_file_size;
+ out_dir->close();
+ // NOTE: need to decrease ref count, but not to delete here,
+ // because index cache may get the same directory from DIRECTORIES
+ _CLDECDELETE(out_dir)
+ _total_file_size = compound_file_output->getFilePointer();
+ compound_file_output->close();
+ _file_info.set_index_size(_total_file_size);
+ } catch (CLuceneError& err) {
+ LOG(ERROR) << "CLuceneError occur when close idx file " << index_path
+ << " error msg: " << err.what();
+ if (compound_file_output) {
+ compound_file_output->close();
+ compound_file_output.reset();
+ }
+ return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
+ "CLuceneError occur when close idx file: {}, error msg: {}",
index_path.c_str(),
+ err.what());
+ }
+ return Status::OK();
}
-} // namespace doris::segment_v2
\ No newline at end of file
+} // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
index ccd6953cdd7..31e287d6dd3 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_writer.h
@@ -64,8 +64,8 @@ public:
Status delete_index(const TabletIndex* index_meta);
Status initialize(InvertedIndexDirectoryMap& indices_dirs);
~InvertedIndexFileWriter() = default;
- int64_t write_v2();
- int64_t write_v1();
+ Status write_v2();
+ Status write_v1();
Status close();
int64_t headerLength();
const InvertedIndexFileInfo* get_index_file_info() const {
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
index f752c530020..ded71c8a6cc 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
@@ -183,7 +183,10 @@
DorisFSDirectory::FSIndexInput::SharedHandle::SharedHandle(const char* path) {
DorisFSDirectory::FSIndexInput::SharedHandle::~SharedHandle() {
if (_reader) {
- if (_reader->close().ok()) {
+ auto st = _reader->close();
+ DBUG_EXECUTE_IF("FSIndexInput::~SharedHandle_reader_close_error",
+ { st =
Status::Error<doris::ErrorCode::NOT_FOUND>("failed to close"); });
+ if (st.ok()) {
_reader = nullptr;
}
}
@@ -238,10 +241,17 @@ void
DorisFSDirectory::FSIndexInput::readInternal(uint8_t* b, const int32_t len)
Slice result {b, (size_t)len};
size_t bytes_read = 0;
- if (!_handle->_reader->read_at(_pos, result, &bytes_read, &_io_ctx).ok()) {
+ auto st = _handle->_reader->read_at(_pos, result, &bytes_read, &_io_ctx);
+
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error",
{
+ st = Status::InternalError(
+ "debug point:
DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error");
+ })
+ if (!st.ok()) {
_CLTHROWA(CL_ERR_IO, "read past EOF");
}
bufferLength = len;
+
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexInput::readInternal_bytes_read_error",
+ { bytes_read = len + 10; })
if (bytes_read != len) {
_CLTHROWA(CL_ERR_IO, "read error");
}
@@ -313,6 +323,10 @@ void DorisFSDirectory::FSIndexOutput::flushBuffer(const
uint8_t* b, const int32_
_CLTHROWA(CL_ERR_IO, "writer append data when flushBuffer error");
}
} else {
+
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput::flushBuffer_writer_is_nullptr",
+ { _writer = nullptr; })
+
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput::flushBuffer_b_is_nullptr",
+ { b = nullptr; })
if (_writer == nullptr) {
LOG(WARNING) << "File writer is nullptr in
DorisFSDirectory::FSIndexOutput, "
"ignore flush.";
@@ -327,8 +341,7 @@ void DorisFSDirectory::FSIndexOutput::close() {
try {
BufferedIndexOutput::close();
DBUG_EXECUTE_IF(
-
"DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_"
- "close",
+
"DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close",
{
_CLTHROWA(CL_ERR_IO,
"debug point: test throw error in
bufferedindexoutput close");
@@ -342,6 +355,10 @@ void DorisFSDirectory::FSIndexOutput::close() {
_writer.reset(nullptr);
_CLTHROWA(err.number(), err.what());
}
+ DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput.set_writer_nullptr", {
+ LOG(WARNING) << "Dbug execute, set _writer to nullptr";
+ _writer = nullptr;
+ })
if (_writer) {
auto ret = _writer->close();
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error",
@@ -353,6 +370,7 @@ void DorisFSDirectory::FSIndexOutput::close() {
}
} else {
LOG(WARNING) << "File writer is nullptr, ignore finalize and close.";
+ _CLTHROWA(CL_ERR_IO, "close file writer error, _writer = nullptr");
}
_writer.reset(nullptr);
}
@@ -364,13 +382,9 @@ int64_t DorisFSDirectory::FSIndexOutput::length() const {
void DorisFSDirectory::FSIndexOutputV2::init(io::FileWriter* file_writer) {
_index_v2_file_writer = file_writer;
- DBUG_EXECUTE_IF(
-
"DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_"
- "init",
- {
- _CLTHROWA(CL_ERR_IO,
- "debug point: test throw error in fsindexoutput init
mock error");
- })
+
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init",
{
+ _CLTHROWA(CL_ERR_IO, "debug point: test throw error in fsindexoutput
init mock error");
+ })
}
DorisFSDirectory::FSIndexOutputV2::~FSIndexOutputV2() {}
@@ -393,6 +407,10 @@ void DorisFSDirectory::FSIndexOutputV2::flushBuffer(const
uint8_t* b, const int3
_CLTHROWA(CL_ERR_IO, "writer append data when flushBuffer error");
}
} else {
+
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutputV2::flushBuffer_file_writer_is_nullptr",
+ { _index_v2_file_writer = nullptr; })
+
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutputV2::flushBuffer_b_is_nullptr",
+ { b = nullptr; })
if (_index_v2_file_writer == nullptr) {
LOG(WARNING) << "File writer is nullptr in
DorisFSDirectory::FSIndexOutputV2, "
"ignore flush.";
@@ -408,8 +426,7 @@ void DorisFSDirectory::FSIndexOutputV2::close() {
try {
BufferedIndexOutput::close();
DBUG_EXECUTE_IF(
-
"DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_"
- "close",
+
"DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close",
{
_CLTHROWA(CL_ERR_IO,
"debug point: test throw error in
bufferedindexoutput close");
@@ -422,6 +439,10 @@ void DorisFSDirectory::FSIndexOutputV2::close() {
}
_CLTHROWA(err.number(), err.what());
}
+ DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput.set_writer_nullptr", {
+ LOG(WARNING) << "Dbug execute, set _index_v2_file_writer to nullptr";
+ _index_v2_file_writer = nullptr;
+ })
if (_index_v2_file_writer) {
auto ret = _index_v2_file_writer->close();
DBUG_EXECUTE_IF("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error",
@@ -480,7 +501,16 @@ bool DorisFSDirectory::list(std::vector<std::string>*
names) const {
priv_getFN(fl, "");
std::vector<io::FileInfo> files;
bool exists;
- LOG_AND_THROW_IF_ERROR(_fs->list(fl, true, &files, &exists), "List file IO
error");
+ auto st = _fs->list(fl, true, &files, &exists);
+ DBUG_EXECUTE_IF("DorisFSDirectory::list_status_is_not_ok", {
+ st = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point: DorisFSDirectory::list_status_is_not_ok");
+ })
+ LOG_AND_THROW_IF_ERROR(st, "List file IO error");
+ DBUG_EXECUTE_IF("DorisFSDirectory::list_directory_not_exists", { exists =
false; })
+ if (!exists) {
+ LOG_AND_THROW_IF_ERROR(st, fmt::format("Directory {} is not exist",
fl));
+ }
for (auto& file : files) {
names->push_back(file.file_name);
}
@@ -492,7 +522,12 @@ bool DorisFSDirectory::fileExists(const char* name) const {
char fl[CL_MAX_DIR];
priv_getFN(fl, name);
bool exists = false;
- LOG_AND_THROW_IF_ERROR(_fs->exists(fl, &exists), "File exists IO error");
+ auto st = _fs->exists(fl, &exists);
+ DBUG_EXECUTE_IF("DorisFSDirectory::fileExists_status_is_not_ok", {
+ st = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point: DorisFSDirectory::fileExists_status_is_not_ok");
+ })
+ LOG_AND_THROW_IF_ERROR(st, "File exists IO error");
return exists;
}
@@ -518,7 +553,12 @@ void DorisFSDirectory::touchFile(const char* name) {
snprintf(buffer, CL_MAX_DIR, "%s%s%s", directory.c_str(), PATH_DELIMITERA,
name);
io::FileWriterPtr tmp_writer;
- LOG_AND_THROW_IF_ERROR(_fs->create_file(buffer, &tmp_writer), "Touch file
IO error");
+ auto st = _fs->create_file(buffer, &tmp_writer);
+ DBUG_EXECUTE_IF("DorisFSDirectory::touchFile_status_is_not_ok", {
+ st = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point: DorisFSDirectory::touchFile_status_is_not_ok");
+ })
+ LOG_AND_THROW_IF_ERROR(st, "Touch file IO error");
}
int64_t DorisFSDirectory::fileLength(const char* name) const {
@@ -532,6 +572,10 @@ int64_t DorisFSDirectory::fileLength(const char* name)
const {
if (st.code() == ErrorCode::NOT_FOUND) {
_CLTHROWA(CL_ERR_FileNotFound, "File does not exist");
}
+ DBUG_EXECUTE_IF("DorisFSDirectory::fileLength_status_is_not_ok", {
+ st = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point: DorisFSDirectory::fileLength_status_is_not_ok");
+ })
LOG_AND_THROW_IF_ERROR(st, "Get file size IO error");
return size;
}
@@ -544,13 +588,21 @@ bool DorisFSDirectory::openInput(const char* name,
lucene::store::IndexInput*& r
return FSIndexInput::open(_fs, fl, ret, error, bufferSize);
}
-void DorisFSDirectory::close() {}
+void DorisFSDirectory::close() {
+ DBUG_EXECUTE_IF("DorisFSDirectory::close_close_with_error",
+ { _CLTHROWA(CL_ERR_IO, "debug_point: close
DorisFSDirectory error"); })
+}
bool DorisFSDirectory::doDeleteFile(const char* name) {
CND_PRECONDITION(directory[0] != 0, "directory is not open");
char fl[CL_MAX_DIR];
priv_getFN(fl, name);
- LOG_AND_THROW_IF_ERROR(_fs->delete_file(fl), "Delete file IO error");
+ auto st = _fs->delete_file(fl);
+ DBUG_EXECUTE_IF("DorisFSDirectory::doDeleteFile_status_is_not_ok", {
+ st = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point:
DorisFSDirectory::doDeleteFile_status_is_not_ok");
+ })
+ LOG_AND_THROW_IF_ERROR(st, "Delete file IO error");
return true;
}
@@ -558,8 +610,12 @@ bool DorisFSDirectory::deleteDirectory() {
CND_PRECONDITION(directory[0] != 0, "directory is not open");
char fl[CL_MAX_DIR];
priv_getFN(fl, "");
- LOG_AND_THROW_IF_ERROR(_fs->delete_directory(fl),
- fmt::format("Delete directory {} IO error", fl));
+ auto st = _fs->delete_directory(fl);
+
DBUG_EXECUTE_IF("DorisFSDirectory::deleteDirectory_throw_is_not_directory", {
+ st = Status::Error<ErrorCode::NOT_FOUND>(
+ fmt::format("debug point: {} is not a directory", fl));
+ })
+ LOG_AND_THROW_IF_ERROR(st, fmt::format("Delete directory {} IO error",
fl));
return true;
}
@@ -573,11 +629,26 @@ void DorisFSDirectory::renameFile(const char* from, const
char* to) {
priv_getFN(nu, to);
bool exists = false;
- LOG_AND_THROW_IF_ERROR(_fs->exists(nu, &exists), "File exists IO error");
+ auto st = _fs->exists(nu, &exists);
+ DBUG_EXECUTE_IF("DorisFSDirectory::renameFile_exists_status_is_not_ok", {
+ st = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point:
DorisFSDirectory::renameFile_exists_status_is_not_ok");
+ })
+ LOG_AND_THROW_IF_ERROR(st, "File exists IO error");
if (exists) {
- LOG_AND_THROW_IF_ERROR(_fs->delete_directory(nu), fmt::format("Delete
{} IO error", nu));
+ st = _fs->delete_directory(nu);
+
DBUG_EXECUTE_IF("DorisFSDirectory::renameFile_delete_status_is_not_ok", {
+ st = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point:
DorisFSDirectory::renameFile_delete_status_is_not_ok");
+ })
+ LOG_AND_THROW_IF_ERROR(st, fmt::format("Delete {} IO error", nu));
}
- LOG_AND_THROW_IF_ERROR(_fs->rename(old, nu), fmt::format("Rename {} to {}
IO error", old, nu));
+ st = _fs->rename(old, nu);
+ DBUG_EXECUTE_IF("DorisFSDirectory::renameFile_rename_status_is_not_ok", {
+ st = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point:
DorisFSDirectory::renameFile_rename_status_is_not_ok");
+ })
+ LOG_AND_THROW_IF_ERROR(st, fmt::format("Rename {} to {} IO error", old,
nu));
}
lucene::store::IndexOutput* DorisFSDirectory::createOutput(const char* name) {
@@ -585,11 +656,31 @@ lucene::store::IndexOutput*
DorisFSDirectory::createOutput(const char* name) {
char fl[CL_MAX_DIR];
priv_getFN(fl, name);
bool exists = false;
- LOG_AND_THROW_IF_ERROR(_fs->exists(fl, &exists), "Create output file
exists IO error");
+ auto st = _fs->exists(fl, &exists);
+ DBUG_EXECUTE_IF("DorisFSDirectory::createOutput_exists_status_is_not_ok", {
+ st = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point:
DorisFSDirectory::createOutput_exists_status_is_not_ok");
+ })
+ LOG_AND_THROW_IF_ERROR(st, "Create output file exists IO error");
if (exists) {
- LOG_AND_THROW_IF_ERROR(_fs->delete_file(fl),
- fmt::format("Create output delete file {} IO
error", fl));
- LOG_AND_THROW_IF_ERROR(_fs->exists(fl, &exists), "Create output file
exists IO error");
+ st = _fs->delete_file(fl);
+
DBUG_EXECUTE_IF("DorisFSDirectory::createOutput_delete_status_is_not_ok", {
+ st = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point:
DorisFSDirectory::createOutput_delete_status_is_not_ok");
+ })
+ LOG_AND_THROW_IF_ERROR(st, fmt::format("Create output delete file {}
IO error", fl));
+ st = _fs->exists(fl, &exists);
+
DBUG_EXECUTE_IF("DorisFSDirectory::createOutput_exists_after_delete_status_is_not_ok",
{
+ st = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point: "
+
"DorisFSDirectory::createOutput_exists_after_delete_status_is_not_ok");
+ })
+ LOG_AND_THROW_IF_ERROR(st, "Create output file exists IO error");
+
DBUG_EXECUTE_IF("DorisFSDirectory::createOutput_exists_after_delete_error",
+ { exists = true; })
+ if (exists) {
+ _CLTHROWA(CL_ERR_IO, fmt::format("File {} should not exist",
fl).c_str());
+ }
assert(!exists);
}
auto* ret = _CLNEW FSIndexOutput();
@@ -653,6 +744,10 @@ bool DorisRAMFSDirectory::fileExists(const char* name)
const {
int64_t DorisRAMFSDirectory::fileModified(const char* name) const {
std::lock_guard<std::mutex> wlock(_this_lock);
auto* f = filesMap->get((char*)name);
+ DBUG_EXECUTE_IF("DorisRAMFSDirectory::fileModified_file_not_found", { f =
nullptr; })
+ if (f == nullptr) {
+ _CLTHROWA(CL_ERR_IO, fmt::format("NOT FOUND File {}.", name).c_str());
+ }
return f->getLastModified();
}
@@ -661,6 +756,10 @@ void DorisRAMFSDirectory::touchFile(const char* name) {
{
std::lock_guard<std::mutex> wlock(_this_lock);
file = filesMap->get((char*)name);
+ DBUG_EXECUTE_IF("DorisRAMFSDirectory::touchFile_file_not_found", {
file = nullptr; })
+ if (file == nullptr) {
+ _CLTHROWA(CL_ERR_IO, fmt::format("NOT FOUND File {}.",
name).c_str());
+ }
}
const uint64_t ts1 = file->getLastModified();
uint64_t ts2 = lucene::util::Misc::currentTimeMillis();
@@ -677,6 +776,10 @@ void DorisRAMFSDirectory::touchFile(const char* name) {
int64_t DorisRAMFSDirectory::fileLength(const char* name) const {
std::lock_guard<std::mutex> wlock(_this_lock);
auto* f = filesMap->get((char*)name);
+ DBUG_EXECUTE_IF("DorisRAMFSDirectory::fileLength_file_not_found", { f =
nullptr; })
+ if (f == nullptr) {
+ _CLTHROWA(CL_ERR_IO, fmt::format("NOT FOUND File {}.", name).c_str());
+ }
return f->getLength();
}
@@ -684,6 +787,7 @@ bool DorisRAMFSDirectory::openInput(const char* name,
lucene::store::IndexInput*
CLuceneError& error, int32_t bufferSize) {
std::lock_guard<std::mutex> wlock(_this_lock);
auto* file = filesMap->get((char*)name);
+ DBUG_EXECUTE_IF("DorisRAMFSDirectory::openInput_file_not_found", { file =
nullptr; })
if (file == nullptr) {
error.set(CL_ERR_IO,
"[DorisRAMCompoundDirectory::open] The requested file does
not exist.");
@@ -695,6 +799,8 @@ bool DorisRAMFSDirectory::openInput(const char* name,
lucene::store::IndexInput*
void DorisRAMFSDirectory::close() {
DorisFSDirectory::close();
+ DBUG_EXECUTE_IF("DorisRAMFSDirectory::close_close_with_error",
+ { _CLTHROWA(CL_ERR_IO, "debug_point: close
DorisRAMFSDirectory error"); })
}
bool DorisRAMFSDirectory::doDeleteFile(const char* name) {
@@ -730,6 +836,7 @@ void DorisRAMFSDirectory::renameFile(const char* from,
const char* to) {
sizeInBytes -= itr1->second->sizeInBytes;
filesMap->removeitr(itr1);
}
+ DBUG_EXECUTE_IF("DorisRAMFSDirectory::renameFile_itr_filesMap_end", { itr
= filesMap->end(); })
if (itr == filesMap->end()) {
char tmp[1024];
snprintf(tmp, 1024, "cannot rename %s, file does not exist", from);
@@ -752,6 +859,8 @@ lucene::store::IndexOutput*
DorisRAMFSDirectory::createOutput(const char* name)
// get the actual pointer to the output name
char* n = nullptr;
auto itr = filesMap->find(const_cast<char*>(name));
+ DBUG_EXECUTE_IF("DorisRAMFSDirectory::createOutput_itr_filesMap_end",
+ { itr = filesMap->end(); })
if (itr != filesMap->end()) {
n = itr->first;
lucene::store::RAMFile* rf = itr->second;
@@ -784,6 +893,7 @@ DorisFSDirectory*
DorisFSDirectoryFactory::getDirectory(const io::FileSystemSPtr
const char* _file,
bool can_use_ram_dir,
lucene::store::LockFactory* lock_factory) {
DorisFSDirectory* dir = nullptr;
+ DBUG_EXECUTE_IF("DorisFSDirectoryFactory::getDirectory_file_is_nullptr", {
_file = nullptr; });
if (!_file || !*_file) {
_CLTHROWA(CL_ERR_IO, "Invalid directory");
}
@@ -797,10 +907,22 @@ DorisFSDirectory*
DorisFSDirectoryFactory::getDirectory(const io::FileSystemSPtr
dir = _CLNEW DorisRAMFSDirectory();
} else {
bool exists = false;
- LOG_AND_THROW_IF_ERROR(_fs->exists(file, &exists), "Get directory
exists IO error");
+ auto st = _fs->exists(file, &exists);
+
DBUG_EXECUTE_IF("DorisFSDirectoryFactory::getDirectory_exists_status_is_not_ok",
{
+ st = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point:
DorisFSDirectoryFactory::getDirectory_exists_status_is_not_ok");
+ })
+ LOG_AND_THROW_IF_ERROR(st, "Get directory exists IO error");
if (!exists) {
- LOG_AND_THROW_IF_ERROR(_fs->create_directory(file),
- "Get directory create directory IO error");
+ st = _fs->create_directory(file);
+ DBUG_EXECUTE_IF(
+
"DorisFSDirectoryFactory::getDirectory_create_directory_status_is_not_ok", {
+ st = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point: "
+
"DorisFSDirectoryFactory::getDirectory_create_directory_status_is_"
+ "not_ok");
+ })
+ LOG_AND_THROW_IF_ERROR(st, "Get directory create directory IO
error");
}
dir = _CLNEW DorisFSDirectory();
}
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
index 50874d0db5c..29fe4609e59 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
@@ -118,6 +118,12 @@ public:
Status init() override {
try {
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::init_field_type_not_supported", {
+ return
Status::Error<doris::ErrorCode::INVERTED_INDEX_NOT_SUPPORTED>(
+ "Field type not supported");
+ })
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::init_inverted_index_writer_init_error",
+ { _CLTHROWA(CL_ERR_IO, "debug point: init index
error"); })
if constexpr (field_is_slice_type(field_type)) {
return init_fulltext_index();
} else if constexpr (field_is_numeric_type(field_type)) {
@@ -141,6 +147,8 @@ public:
void close_on_error() override {
try {
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::close_on_error_throw_exception",
+ { _CLTHROWA(CL_ERR_IO, "debug point: close on
error"); })
if (_index_writer) {
_index_writer->close();
}
@@ -160,6 +168,9 @@ public:
_bkd_writer = std::make_shared<lucene::util::bkd::bkd_writer>(
max_doc, DIMS, DIMS, value_length, MAX_LEAF_COUNT,
MAXMBSortInHeap,
total_point_count, true, config::max_depth_in_bkd_tree);
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::init_bkd_index_throw_error", {
+ _CLTHROWA(CL_ERR_IllegalArgument, "debug point: create bkd_writer
error");
+ })
return open_index_directory();
}
@@ -174,6 +185,10 @@ public:
}
Status open_index_directory() {
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::open_index_directory_error", {
+ return Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point: open_index_directory_error");
+ })
_dir = DORIS_TRY(_index_file_writer->open(_index_meta));
return Status::OK();
}
@@ -183,6 +198,12 @@ public:
bool close_dir_on_shutdown = true;
auto index_writer = std::make_unique<lucene::index::IndexWriter>(
_dir, _analyzer.get(), create_index, close_dir_on_shutdown);
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_index_writer_setRAMBufferSizeMB_error",
+ { index_writer->setRAMBufferSizeMB(-100); })
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_index_writer_setMaxBufferedDocs_error",
+ { index_writer->setMaxBufferedDocs(1); })
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_index_writer_setMergeFactor_error",
+ { index_writer->setMergeFactor(1); })
index_writer->setRAMBufferSizeMB(config::inverted_index_ram_buffer_size);
index_writer->setMaxBufferedDocs(config::inverted_index_max_buffered_docs);
index_writer->setMaxFieldLength(MAX_FIELD_LEN);
@@ -247,6 +268,8 @@ public:
try {
_index_writer->addDocument(_doc.get());
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_document_throw_error",
+ { _CLTHROWA(CL_ERR_IO, "debug point: add_document
io error"); })
} catch (const CLuceneError& e) {
close_on_error();
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
@@ -258,6 +281,8 @@ public:
Status add_null_document() {
try {
_index_writer->addNullDocument(_doc.get());
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_null_document_throw_error",
+ { _CLTHROWA(CL_ERR_IO, "debug point:
add_null_document io error"); })
} catch (const CLuceneError& e) {
close_on_error();
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
@@ -270,6 +295,10 @@ public:
_null_bitmap.addRange(_rid, _rid + count);
_rid += count;
if constexpr (field_is_slice_type(field_type)) {
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_nulls_field_nullptr",
+ { _field = nullptr; })
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_nulls_index_writer_nullptr",
+ { _index_writer = nullptr; })
if (_field == nullptr || _index_writer == nullptr) {
LOG(ERROR) << "field or index writer is null in inverted index
writer.";
return Status::InternalError(
@@ -288,17 +317,30 @@ public:
return Status::OK();
}
- void new_inverted_index_field(const char* field_value_data, size_t
field_value_size) {
- if (_parser_type != InvertedIndexParserType::PARSER_UNKNOWN &&
- _parser_type != InvertedIndexParserType::PARSER_NONE) {
- new_char_token_stream(field_value_data, field_value_size, _field);
- } else {
- new_field_char_value(field_value_data, field_value_size, _field);
+ Status new_inverted_index_field(const char* field_value_data, size_t
field_value_size) {
+ try {
+ if (_parser_type != InvertedIndexParserType::PARSER_UNKNOWN &&
+ _parser_type != InvertedIndexParserType::PARSER_NONE) {
+ new_char_token_stream(field_value_data, field_value_size,
_field);
+ } else {
+ new_field_char_value(field_value_data, field_value_size,
_field);
+ }
+ } catch (const CLuceneError& e) {
+ return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
+ "CLuceneError create new index field error: {}", e.what());
}
+ return Status::OK();
}
void new_char_token_stream(const char* s, size_t len,
lucene::document::Field* field) {
_char_string_reader->init(s, len, false);
+ DBUG_EXECUTE_IF(
+
"InvertedIndexColumnWriterImpl::new_char_token_stream__char_string_reader_init_"
+ "error",
+ {
+ _CLTHROWA(CL_ERR_UnsupportedOperation,
+ "UnsupportedOperationException: CLStream::init");
+ })
auto* stream = _analyzer->reusableTokenStream(field->name(),
_char_string_reader.get());
field->setValue(stream);
}
@@ -316,6 +358,10 @@ public:
Status add_values(const std::string fn, const void* values, size_t count)
override {
if constexpr (field_is_slice_type(field_type)) {
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_values_field_is_nullptr",
+ { _field = nullptr; })
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_values_index_writer_is_nullptr",
+ { _index_writer = nullptr; })
if (_field == nullptr || _index_writer == nullptr) {
LOG(ERROR) << "field or index writer is null in inverted index
writer.";
return Status::InternalError(
@@ -329,7 +375,7 @@ public:
(_parser_type != InvertedIndexParserType::PARSER_NONE &&
v->empty())) {
RETURN_IF_ERROR(add_null_document());
} else {
- new_inverted_index_field(v->get_data(), v->get_size());
+ RETURN_IF_ERROR(new_inverted_index_field(v->get_data(),
v->get_size()));
RETURN_IF_ERROR(add_document());
}
++v;
@@ -343,12 +389,17 @@ public:
Status add_array_values(size_t field_size, const void* value_ptr, const
uint8_t* null_map,
const uint8_t* offsets_ptr, size_t count) override
{
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_array_values_count_is_zero",
+ { count = 0; })
if (count == 0) {
// no values to add inverted index
return Status::OK();
}
const auto* offsets = reinterpret_cast<const uint64_t*>(offsets_ptr);
if constexpr (field_is_slice_type(field_type)) {
+ DBUG_EXECUTE_IF(
+
"InvertedIndexColumnWriterImpl::add_array_values_index_writer_is_nullptr",
+ { _index_writer = nullptr; })
if (_index_writer == nullptr) {
LOG(ERROR) << "index writer is null in inverted index writer.";
return Status::InternalError("index writer is null in inverted
index writer");
@@ -374,7 +425,15 @@ public:
continue;
} else {
// now we temp create field . later make a pool
- if (Status st = create_field(&new_field); st !=
Status::OK()) {
+ Status st = create_field(&new_field);
+ DBUG_EXECUTE_IF(
+
"InvertedIndexColumnWriterImpl::add_array_values_create_field_"
+ "error",
+ {
+ st =
Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point:
add_array_values_create_field_error");
+ })
+ if (st != Status::OK()) {
LOG(ERROR) << "create field "
<< string(_field_name.begin(),
_field_name.end())
<< " error:" << st;
@@ -426,7 +485,14 @@ public:
// avoid to add doc which without any field which may make
threadState init skip
// init fieldDataArray, then will make error with next doc
with fields in
// resetCurrentFieldData
- if (Status st = create_field(&new_field); st !=
Status::OK()) {
+ Status st = create_field(&new_field);
+ DBUG_EXECUTE_IF(
+
"InvertedIndexColumnWriterImpl::add_array_values_create_field_error_2",
+ {
+ st = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point:
add_array_values_create_field_error_2");
+ })
+ if (st != Status::OK()) {
LOG(ERROR)
<< "create field " <<
string(_field_name.begin(), _field_name.end())
<< " error:" << st;
@@ -460,6 +526,11 @@ public:
Status add_array_values(size_t field_size, const CollectionValue* values,
size_t count) override {
if constexpr (field_is_slice_type(field_type)) {
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriterImpl::add_array_values_field_is_nullptr",
+ { _field = nullptr; })
+ DBUG_EXECUTE_IF(
+
"InvertedIndexColumnWriterImpl::add_array_values_index_writer_is_nullptr",
+ { _index_writer = nullptr; })
if (_field == nullptr || _index_writer == nullptr) {
LOG(ERROR) << "field or index writer is null in inverted index
writer.";
return Status::InternalError(
@@ -478,7 +549,7 @@ public:
item_data_ptr = (uint8_t*)item_data_ptr + field_size;
}
auto value = join(strings, " ");
- new_inverted_index_field(value.c_str(), value.length());
+ RETURN_IF_ERROR(new_inverted_index_field(value.c_str(),
value.length()));
_rid++;
RETURN_IF_ERROR(add_document());
values++;
@@ -668,6 +739,8 @@ Status InvertedIndexColumnWriter::create(const Field* field,
bool single_field = true;
if (type == FieldType::OLAP_FIELD_TYPE_ARRAY) {
const auto* array_typeinfo = dynamic_cast<const
ArrayTypeInfo*>(typeinfo);
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_array_typeinfo_is_nullptr",
+ { array_typeinfo = nullptr; })
if (array_typeinfo != nullptr) {
typeinfo = array_typeinfo->item_type_info();
type = typeinfo->type();
@@ -678,6 +751,8 @@ Status InvertedIndexColumnWriter::create(const Field* field,
}
}
+
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::create_unsupported_type_for_inverted_index",
+ { type = FieldType::OLAP_FIELD_TYPE_FLOAT; })
switch (type) {
#define M(TYPE) \
case TYPE: \
diff --git a/be/src/olap/task/index_builder.cpp
b/be/src/olap/task/index_builder.cpp
index 975920a437e..2ce31527f61 100644
--- a/be/src/olap/task/index_builder.cpp
+++ b/be/src/olap/task/index_builder.cpp
@@ -68,8 +68,11 @@ Status IndexBuilder::update_inverted_index_info() {
_output_rowsets.reserve(_input_rowsets.size());
_pending_rs_guards.reserve(_input_rowsets.size());
for (auto&& input_rowset : _input_rowsets) {
- if (!input_rowset->is_local()) [[unlikely]] {
- DCHECK(false) << _tablet->tablet_id() << ' ' <<
input_rowset->rowset_id();
+ bool is_local_rowset = input_rowset->is_local();
+
DBUG_EXECUTE_IF("IndexBuilder::update_inverted_index_info_is_local_rowset",
+ { is_local_rowset = false; })
+ if (!is_local_rowset) [[unlikely]] {
+ // DCHECK(false) << _tablet->tablet_id() << ' ' <<
input_rowset->rowset_id();
return Status::InternalError("should be local rowset. tablet_id={}
rowset_id={}",
_tablet->tablet_id(),
input_rowset->rowset_id().to_string());
@@ -81,6 +84,9 @@ Status IndexBuilder::update_inverted_index_info() {
size_t total_index_size = 0;
auto* beta_rowset = reinterpret_cast<BetaRowset*>(input_rowset.get());
auto size_st = beta_rowset->get_inverted_index_size(&total_index_size);
+
DBUG_EXECUTE_IF("IndexBuilder::update_inverted_index_info_size_st_not_ok", {
+ size_st = Status::Error<ErrorCode::INIT_FAILED>("debug point: get
fs failed");
+ })
if (!size_st.ok() &&
!size_st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>() &&
!size_st.is<ErrorCode::NOT_FOUND>()) {
return size_st;
@@ -229,6 +235,11 @@ Status IndexBuilder::update_inverted_index_info() {
std::string
{InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
output_rs_tablet_schema->get_inverted_index_storage_format());
auto st = idx_file_reader->init();
+ DBUG_EXECUTE_IF(
+
"IndexBuilder::update_inverted_index_info_index_file_reader_init_not_ok", {
+ st = Status::Error<ErrorCode::INIT_FAILED>(
+ "debug point: reader init error");
+ })
if (!st.ok() &&
!st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>()) {
return st;
}
@@ -262,8 +273,11 @@ Status IndexBuilder::update_inverted_index_info() {
Status IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr
output_rowset_meta,
std::vector<segment_v2::SegmentSharedPtr>& segments) {
- if (!output_rowset_meta->is_local()) [[unlikely]] {
- DCHECK(false) << _tablet->tablet_id() << ' ' <<
output_rowset_meta->rowset_id();
+ bool is_local_rowset = output_rowset_meta->is_local();
+ DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_is_local_rowset",
+ { is_local_rowset = false; })
+ if (!is_local_rowset) [[unlikely]] {
+ // DCHECK(false) << _tablet->tablet_id() << ' ' <<
output_rowset_meta->rowset_id();
return Status::InternalError("should be local rowset. tablet_id={}
rowset_id={}",
_tablet->tablet_id(),
output_rowset_meta->rowset_id().to_string());
@@ -280,6 +294,8 @@ Status
IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
for (auto& seg_ptr : segments) {
auto idx_file_reader_iter = _inverted_index_file_readers.find(
std::make_pair(output_rowset_meta->rowset_id().to_string(), seg_ptr->id()));
+
DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_can_not_find_reader_drop_op",
+ { idx_file_reader_iter =
_inverted_index_file_readers.end(); })
if (idx_file_reader_iter ==
_inverted_index_file_readers.end()) {
LOG(ERROR) << "idx_file_reader_iter" <<
output_rowset_meta->rowset_id() << ":"
<< seg_ptr->id() << " cannot be found";
@@ -350,6 +366,8 @@ Status
IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
InvertedIndexStorageFormatPB::V2) {
auto idx_file_reader_iter = _inverted_index_file_readers.find(
std::make_pair(output_rowset_meta->rowset_id().to_string(), seg_ptr->id()));
+
DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_can_not_find_reader",
+ { idx_file_reader_iter =
_inverted_index_file_readers.end(); })
if (idx_file_reader_iter ==
_inverted_index_file_readers.end()) {
LOG(ERROR) << "idx_file_reader_iter" <<
output_rowset_meta->rowset_id() << ":"
<< seg_ptr->id() << " cannot be found";
@@ -395,7 +413,11 @@ Status
IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
}
auto column = output_rowset_schema->column(column_idx);
// variant column is not support for building index
- if
(!InvertedIndexColumnWriter::check_support_inverted_index(column)) {
+ auto is_support_inverted_index =
+
InvertedIndexColumnWriter::check_support_inverted_index(column);
+
DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_support_inverted_index",
+ { is_support_inverted_index = false; })
+ if (!is_support_inverted_index) {
continue;
}
DCHECK(output_rowset_schema->has_inverted_index_with_index_id(index_id));
@@ -408,6 +430,12 @@ Status
IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
RETURN_IF_ERROR(segment_v2::InvertedIndexColumnWriter::create(
field.get(), &inverted_index_builder,
inverted_index_file_writer.get(),
index_meta));
+ DBUG_EXECUTE_IF(
+
"IndexBuilder::handle_single_rowset_index_column_writer_create_error", {
+ _CLTHROWA(CL_ERR_IO,
+ "debug point: "
+
"handle_single_rowset_index_column_writer_create_error");
+ })
} catch (const std::exception& e) {
return
Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occured: {}", e.what());
@@ -438,6 +466,10 @@ Status
IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
std::make_shared<Schema>(output_rowset_schema->columns(),
return_columns);
std::unique_ptr<RowwiseIterator> iter;
auto res = seg_ptr->new_iterator(schema, read_options, &iter);
+
DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_create_iterator_error", {
+ res = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point:
handle_single_rowset_create_iterator_error");
+ })
if (!res.ok()) {
LOG(WARNING) << "failed to create iterator[" << seg_ptr->id()
<< "]: " << res.to_string();
@@ -448,7 +480,7 @@ Status
IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
output_rowset_schema->create_block(return_columns));
while (true) {
auto status = iter->next_batch(block.get());
- DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset", {
+
DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_iterator_next_batch_error",
{
status =
Status::Error<ErrorCode::SCHEMA_CHANGE_INFO_INVALID>(
"next_batch fault injection");
});
@@ -463,8 +495,15 @@ Status
IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
}
// write inverted index data
- if (_write_inverted_index_data(output_rowset_schema,
iter->data_id(),
- block.get()) != Status::OK()) {
+ status = _write_inverted_index_data(output_rowset_schema,
iter->data_id(),
+ block.get());
+ DBUG_EXECUTE_IF(
+
"IndexBuilder::handle_single_rowset_write_inverted_index_data_error", {
+ status = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point: "
+
"handle_single_rowset_write_inverted_index_data_error");
+ })
+ if (!status.ok()) {
return
Status::Error<ErrorCode::SCHEMA_CHANGE_INFO_INVALID>(
"failed to write block.");
}
@@ -477,6 +516,10 @@ Status
IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
if (_inverted_index_builders[writer_sign]) {
RETURN_IF_ERROR(_inverted_index_builders[writer_sign]->finish());
}
+
DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_index_build_finish_error", {
+ _CLTHROWA(CL_ERR_IO,
+ "debug point:
handle_single_rowset_index_build_finish_error");
+ })
} catch (const std::exception& e) {
return
Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occured: {}", e.what());
@@ -487,6 +530,10 @@ Status
IndexBuilder::handle_single_rowset(RowsetMetaSharedPtr output_rowset_meta
}
for (auto&& [seg_id, inverted_index_file_writer] :
_inverted_index_file_writers) {
auto st = inverted_index_file_writer->close();
+
DBUG_EXECUTE_IF("IndexBuilder::handle_single_rowset_file_writer_close_error", {
+ st = Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
+ "debug point:
handle_single_rowset_file_writer_close_error");
+ })
if (!st.ok()) {
LOG(ERROR) << "close inverted_index_writer error:" << st;
return st;
@@ -516,6 +563,8 @@ Status
IndexBuilder::_write_inverted_index_data(TabletSchemaSPtr tablet_schema,
auto index_id = inverted_index.index_id;
auto column_name = inverted_index.columns[0];
auto column_idx = tablet_schema->field_index(column_name);
+
DBUG_EXECUTE_IF("IndexBuilder::_write_inverted_index_data_column_idx_is_negative",
+ { column_idx = -1; })
if (column_idx < 0) {
if (!inverted_index.column_unique_ids.empty()) {
auto column_unique_id = inverted_index.column_unique_ids[0];
@@ -532,6 +581,10 @@ Status
IndexBuilder::_write_inverted_index_data(TabletSchemaSPtr tablet_schema,
auto writer_sign = std::make_pair(segment_idx, index_id);
std::unique_ptr<Field> field(FieldFactory::create(column));
auto converted_result = _olap_data_convertor->convert_column_data(i);
+
DBUG_EXECUTE_IF("IndexBuilder::_write_inverted_index_data_convert_column_data_error",
{
+ converted_result.first = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "debug point:
_write_inverted_index_data_convert_column_data_error");
+ })
if (converted_result.first != Status::OK()) {
LOG(WARNING) << "failed to convert block, errcode: " <<
converted_result.first;
return converted_result.first;
@@ -583,6 +636,9 @@ Status IndexBuilder::_add_nullable(const std::string&
column_name,
field->get_sub_field(0)->size(),
reinterpret_cast<const void*>(data),
reinterpret_cast<const uint8_t*>(nested_null_map),
offsets_ptr, num_rows));
}
+
DBUG_EXECUTE_IF("IndexBuilder::_add_nullable_add_array_values_error", {
+ _CLTHROWA(CL_ERR_IO, "debug point:
_add_nullable_add_array_values_error");
+ })
} catch (const std::exception& e) {
return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"CLuceneError occured: {}", e.what());
@@ -608,6 +664,8 @@ Status IndexBuilder::_add_nullable(const std::string&
column_name,
}
*ptr += field->size() * step;
offset += step;
+ DBUG_EXECUTE_IF("IndexBuilder::_add_nullable_throw_exception",
+ { _CLTHROWA(CL_ERR_IO, "debug point:
_add_nullable_throw_exception"); })
} while (offset < num_rows);
} catch (const std::exception& e) {
return
Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occured:
{}",
@@ -640,6 +698,8 @@ Status IndexBuilder::_add_data(const std::string&
column_name,
RETURN_IF_ERROR(_inverted_index_builders[index_writer_sign]->add_values(
column_name, *ptr, num_rows));
}
+ DBUG_EXECUTE_IF("IndexBuilder::_add_data_throw_exception",
+ { _CLTHROWA(CL_ERR_IO, "debug point:
_add_data_throw_exception"); })
} catch (const std::exception& e) {
return
Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>("CLuceneError occured:
{}",
e.what());
@@ -665,6 +725,8 @@ Status IndexBuilder::handle_inverted_index_data() {
Status IndexBuilder::do_build_inverted_index() {
LOG(INFO) << "begin to do_build_inverted_index, tablet=" <<
_tablet->tablet_id()
<< ", is_drop_op=" << _is_drop_op;
+
DBUG_EXECUTE_IF("IndexBuilder::do_build_inverted_index_alter_inverted_indexes_empty",
+ { _alter_inverted_indexes.clear(); })
if (_alter_inverted_indexes.empty()) {
return Status::OK();
}
@@ -735,6 +797,10 @@ Status IndexBuilder::do_build_inverted_index() {
// modify rowsets in memory
st = modify_rowsets();
+
DBUG_EXECUTE_IF("IndexBuilder::do_build_inverted_index_modify_rowsets_status_error",
{
+ st = Status::Error<ErrorCode::DELETE_VERSION_ERROR>(
+ "debug point:
do_build_inverted_index_modify_rowsets_status_error");
+ })
if (!st.ok()) {
LOG(WARNING) << "failed to modify rowsets in memory. "
<< "tablet=" << _tablet->tablet_id() << ", error=" << st;
@@ -792,7 +858,10 @@ Status IndexBuilder::modify_rowsets(const
Merger::Statistics* stats) {
void IndexBuilder::gc_output_rowset() {
for (auto&& output_rowset : _output_rowsets) {
- if (!output_rowset->is_local()) {
+ auto is_local_rowset = output_rowset->is_local();
+ DBUG_EXECUTE_IF("IndexBuilder::gc_output_rowset_is_local_rowset",
+ { is_local_rowset = false; })
+ if (!is_local_rowset) {
_tablet->record_unused_remote_rowset(output_rowset->rowset_id(),
output_rowset->rowset_meta()->resource_id(),
output_rowset->num_segments());
diff --git
a/regression-test/suites/fault_injection_p0/test_index_compaction_exception_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_index_compaction_exception_fault_injection.groovy
new file mode 100644
index 00000000000..ac3cd8125a8
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_index_compaction_exception_fault_injection.groovy
@@ -0,0 +1,341 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_index_compaction_exception_fault_injection", "nonConcurrent") {
+ def isCloudMode = isCloudMode()
+ def tableName = "test_index_compaction_exception_fault_injection_dups"
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def changed_variables = sql "show variables where Changed = 1"
+ logger.info("changed variables: " + changed_variables.toString())
+ // sql "UNSET GLOBAL VARIABLE ALL;"
+ sql "SET global enable_match_without_inverted_index = false"
+
+ boolean disableAutoCompaction = false
+
+ def set_be_config = { key, value ->
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ",
err=" + err)
+ }
+ }
+
+ def trigger_full_compaction_on_tablets = { tablets ->
+ for (def tablet : tablets) {
+ String tablet_id = tablet.TabletId
+ String backend_id = tablet.BackendId
+ int times = 1
+
+ String compactionStatus;
+ do{
+ def (code, out, err) =
be_run_full_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out +
", err=" + err)
+ ++times
+ sleep(2000)
+ compactionStatus = parseJson(out.trim()).status.toLowerCase();
+ } while (compactionStatus!="success" && times<=10 &&
compactionStatus!="e-6010")
+
+
+ if (compactionStatus == "fail") {
+ assertEquals(disableAutoCompaction, false)
+ logger.info("Compaction was done automatically!")
+ }
+ if (disableAutoCompaction && compactionStatus!="e-6010") {
+ assertEquals("success", compactionStatus)
+ }
+ }
+ }
+
+ def wait_full_compaction_done = { tablets ->
+ for (def tablet in tablets) {
+ boolean running = true
+ do {
+ Thread.sleep(1000)
+ String tablet_id = tablet.TabletId
+ String backend_id = tablet.BackendId
+ def (code, out, err) =
be_get_compaction_status(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Get compaction status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success", compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+ }
+
+ def get_rowset_count = { tablets ->
+ int rowsetCount = 0
+ for (def tablet in tablets) {
+ def (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ rowsetCount +=((List<String>) tabletJson.rowsets).size()
+ }
+ return rowsetCount
+ }
+
+ def check_config = { String key, String value ->
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+ logger.info("Show config: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == key) {
+ assertEquals(value, ((List<String>) ele)[2])
+ }
+ }
+ }
+ }
+
+ def insert_data = { ->
+ sql """ INSERT INTO ${tableName} VALUES (1, "andy", 10, [89, 80, 98],
["football", "basketball"], "andy is good at sports", ["andy has a good heart",
"andy is so nice"]); """
+ sql """ INSERT INTO ${tableName} VALUES (1, "bason", 11, [79, 85, 97],
["singing", "dancing"], "bason is good at singing", ["bason is very clever",
"bason is very healthy"]); """
+ sql """ INSERT INTO ${tableName} VALUES (2, "andy", 10, [89, 80, 98],
["football", "basketball"], "andy is good at sports", ["andy has a good heart",
"andy is so nice"]); """
+ sql """ INSERT INTO ${tableName} VALUES (2, "bason", 11, [79, 85, 97],
["singing", "dancing"], "bason is good at singing", ["bason is very clever",
"bason is very healthy"]); """
+ sql """ INSERT INTO ${tableName} VALUES (3, "andy", 10, [89, 80, 98],
["football", "basketball"], "andy is good at sports", ["andy has a good heart",
"andy is so nice"]); """
+ sql """ INSERT INTO ${tableName} VALUES (3, "bason", 11, [79, 85, 97],
["singing", "dancing"], "bason is good at singing", ["bason is very clever",
"bason is very healthy"]); """
+ }
+
+ def run_sql = { ->
+ def result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
name MATCH 'bason'"
+ assertEquals(3, result.size())
+ assertEquals(1, result[0]['id'])
+ assertEquals("bason", result[0]['name'])
+ assertEquals(2, result[1]['id'])
+ assertEquals("bason", result[1]['name'])
+ assertEquals(3, result[2]['id'])
+ assertEquals("bason", result[2]['name'])
+
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE age =
11"
+ assertEquals(3, result.size())
+ assertEquals(1, result[0]['id'])
+ assertEquals("bason", result[0]['name'])
+ assertEquals(2, result[1]['id'])
+ assertEquals("bason", result[1]['name'])
+ assertEquals(3, result[2]['id'])
+ assertEquals("bason", result[2]['name'])
+
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
description MATCH 'singing'"
+ assertEquals(3, result.size())
+ assertEquals("bason", result[0]['name'])
+ assertEquals("bason is good at singing", result[0]['description'])
+ assertEquals("bason", result[1]['name'])
+ assertEquals("bason is good at singing", result[1]['description'])
+ assertEquals("bason", result[2]['name'])
+ assertEquals("bason is good at singing", result[2]['description'])
+
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
array_contains(scores, 79)"
+ assertEquals(3, result.size())
+ assertEquals("bason", result[0]['name'])
+ assertEquals("[79, 85, 97]", result[0]['scores'])
+ assertEquals("bason", result[1]['name'])
+ assertEquals("[79, 85, 97]", result[1]['scores'])
+ assertEquals("bason", result[2]['name'])
+ assertEquals("[79, 85, 97]", result[2]['scores'])
+
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
array_contains(hobbies, 'dancing')"
+ assertEquals(3, result.size())
+ assertEquals("bason", result[0]['name'])
+ assertEquals('["singing", "dancing"]', result[0]['hobbies'])
+ assertEquals("bason", result[1]['name'])
+ assertEquals('["singing", "dancing"]', result[1]['hobbies'])
+ assertEquals("bason", result[2]['name'])
+ assertEquals('["singing", "dancing"]', result[2]['hobbies'])
+
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
array_contains(evaluation, 'bason is very clever')"
+ assertEquals(3, result.size())
+ assertEquals("bason", result[0]['name'])
+ assertEquals('["bason is very clever", "bason is very healthy"]',
result[0]['evaluation'])
+ assertEquals("bason", result[1]['name'])
+ assertEquals('["bason is very clever", "bason is very healthy"]',
result[1]['evaluation'])
+ assertEquals("bason", result[2]['name'])
+ assertEquals('["bason is very clever", "bason is very healthy"]',
result[2]['evaluation'])
+ }
+
+ // define debug points array
+ def debug_points_abnormal_compaction = [
+ "compact_column_getDirectory_error",
+ "compact_column_create_index_writer_error",
+ "compact_column_indexCompaction_error",
+ "compact_column_index_writer_close_error",
+ "compact_column_src_index_dirs_close_error",
+ "Compaction::do_inverted_index_compaction_find_rowset_error",
+ "Compaction::do_inverted_index_compaction_get_fs_error",
+
"Compaction::do_inverted_index_compaction_index_file_reader_init_error",
+ // "Compaction::do_inverted_index_compaction_file_size_status_not_ok",
// v2 do not do index compaction
+ "Compaction::do_inverted_index_compaction_can_not_find_index_meta",
+ "Compaction::do_inverted_index_compaction_index_properties_different",
+
"Compaction::do_inverted_index_compaction_index_file_writer_close_not_ok",
+ "Compaction::construct_skip_inverted_index_index_reader_close_error"
+ ]
+
+ def debug_points_normal_compaction = [
+ "compact_column_local_tmp_dir_delete_error",
+ //
"Compaction::do_inverted_index_compaction_dest_segment_num_is_zero", // query
result not match without inverted index
+
"Compaction::do_inverted_index_compaction_index_file_reader_init_not_found",
+ "Compaction::construct_skip_inverted_index_is_skip_index_compaction",
+ "Compaction::construct_skip_inverted_index_get_fs_error",
+ "Compaction::construct_skip_inverted_index_index_meta_nullptr",
+ "Compaction::construct_skip_inverted_index_seg_path_nullptr",
+
"Compaction::construct_skip_inverted_index_index_file_reader_init_status_not_ok",
+
"Compaction::construct_skip_inverted_index_index_file_reader_exist_status_not_ok",
+
"Compaction::construct_skip_inverted_index_index_file_reader_exist_false",
+
"Compaction::construct_skip_inverted_index_index_file_reader_open_error",
+ "Compaction::construct_skip_inverted_index_index_files_count"
+ ]
+
+ def run_test = { tablets, debug_point, abnormal ->
+ insert_data.call()
+
+ run_sql.call()
+
+ int replicaNum = 1
+ def dedup_tablets = deduplicate_tablets(tablets)
+ if (dedup_tablets.size() > 0) {
+ replicaNum = Math.round(tablets.size() / dedup_tablets.size())
+ if (replicaNum != 1 && replicaNum != 3) {
+ assert(false)
+ }
+ }
+
+ // before full compaction, there are 7 rowsets.
+ int rowsetCount = get_rowset_count.call(tablets);
+ assert (rowsetCount == 7 * replicaNum)
+
+ // debug point, enable it, triger full compaction, wait full
compaction done, and disable the debug point
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(debug_point)
+ logger.info("trigger_full_compaction_on_tablets with fault
injection: ${debug_point}")
+ trigger_full_compaction_on_tablets.call(tablets)
+ wait_full_compaction_done.call(tablets)
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(debug_point)
+ }
+
+ if (abnormal) {
+ // after fault injection, there are still 7 rowsets.
+ rowsetCount = get_rowset_count.call(tablets);
+ assert (rowsetCount == 7 * replicaNum)
+
+ logger.info("trigger_full_compaction_on_tablets normally")
+ // trigger full compactions for all tablets in ${tableName}
+ // this time, index compaction of some columns will be skipped
because of the fault injection
+ trigger_full_compaction_on_tablets.call(tablets)
+
+ // wait for full compaction done
+ wait_full_compaction_done.call(tablets)
+ }
+
+ // after full compaction, there is only 1 rowset.
+ rowsetCount = get_rowset_count.call(tablets);
+ if (isCloudMode) {
+ assert (rowsetCount == (1 + 1) * replicaNum)
+ } else {
+ assert (rowsetCount == 1 * replicaNum)
+ }
+
+ run_sql.call()
+ }
+
+ def create_and_test_table = { table_name, key_type, debug_points,
is_abnormal ->
+ debug_points.each { debug_point ->
+ sql """ DROP TABLE IF EXISTS ${table_name}; """
+ sql """
+ CREATE TABLE ${table_name} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `age` int(11) NULL,
+ `scores` array<int> NULL,
+ `hobbies` array<text> NULL,
+ `description` text NULL,
+ `evaluation` array<text> NULL,
+ index index_name (name) using inverted,
+ index index_age (age) using inverted,
+ index index_scores (scores) using inverted,
+ index index_hobbies (hobbies) using inverted,
+ index index_description (description) using inverted
properties("parser" = "english"),
+ index index_evaluation (evaluation) using inverted
+ ) ENGINE=OLAP
+ ${key_type} KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "inverted_index_storage_format" = "V1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+ def tablets = sql_return_maparray """ show tablets from
${table_name}; """
+ run_test.call(tablets, debug_point, is_abnormal)
+ }
+ }
+
+ boolean invertedIndexCompactionEnable = false
+ boolean has_update_be_config = false
+ try {
+ String backend_id;
+ backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+
+ logger.info("Show config: code=" + code + ", out=" + out + ", err=" +
err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == "inverted_index_compaction_enable")
{
+ invertedIndexCompactionEnable =
Boolean.parseBoolean(((List<String>) ele)[2])
+ logger.info("inverted_index_compaction_enable:
${((List<String>) ele)[2]}")
+ }
+ if (((List<String>) ele)[0] == "disable_auto_compaction") {
+ disableAutoCompaction = Boolean.parseBoolean(((List<String>)
ele)[2])
+ logger.info("disable_auto_compaction: ${((List<String>)
ele)[2]}")
+ }
+ }
+ set_be_config.call("inverted_index_compaction_enable", "true")
+ has_update_be_config = true
+ // check updated config
+ check_config.call("inverted_index_compaction_enable", "true");
+
+ // duplicated key table
+ create_and_test_table.call(tableName, "DUPLICATE",
debug_points_abnormal_compaction, true)
+ create_and_test_table.call(tableName, "DUPLICATE",
debug_points_normal_compaction, false)
+
+ // unique key table
+ tableName = "test_index_compaction_exception_fault_injection_unique"
+ create_and_test_table.call(tableName, "UNIQUE",
debug_points_abnormal_compaction, true)
+ create_and_test_table.call(tableName, "UNIQUE",
debug_points_normal_compaction, false)
+
+ } finally {
+ if (has_update_be_config) {
+ set_be_config.call("inverted_index_compaction_enable",
invertedIndexCompactionEnable.toString())
+ }
+ sql "SET global enable_match_without_inverted_index = true"
+ }
+
+}
diff --git
a/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy
new file mode 100644
index 00000000000..6deb96bfdea
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_write_inverted_index_exception_fault_injection.groovy
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_write_inverted_index_exception_fault_injection", "nonConcurrent") {
+ def tableNamePrefix = "test_write_inverted_index_exception_fault_injection"
+
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def set_be_config = { key, value ->
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ",
err=" + err)
+ }
+ }
+
+ def check_config = { String key, String value ->
+ for (String backend_id: backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+ logger.info("Show config: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == key) {
+ assertEquals(value, ((List<String>) ele)[2])
+ }
+ }
+ }
+ }
+
+ def changed_variables = sql "show variables where Changed = 1"
+ logger.info("changed variables: " + changed_variables.toString())
+ // sql "UNSET GLOBAL VARIABLE ALL;"
+
+ sql "SET global enable_match_without_inverted_index = false"
+ boolean inverted_index_ram_dir_enable = true
+ boolean has_update_be_config = false
+
+ def creata_table = { String tableName, String format ->
+ sql "DROP TABLE IF EXISTS ${tableName}"
+
+ sql """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `age` int(11) NULL,
+ `scores` array<int> NULL,
+ `hobbies` array<text> NULL,
+ `description` text NULL,
+ `evaluation` array<text> NULL,
+ index index_name (name) using inverted,
+ index index_age (age) using inverted,
+ index index_scores (scores) using inverted,
+ index index_hobbies (hobbies) using inverted,
+ index index_description (description) using inverted
properties("parser" = "english"),
+ index index_evaluation (evaluation) using inverted
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "inverted_index_storage_format" = "${format}",
+ "disable_auto_compaction" = "true"
+ );
+ """
+ }
+
+ def run_insert = { String tableName ->
+ sql """ INSERT INTO ${tableName} VALUES (1, "andy", 10, [89, 80, 98],
["football", "basketball"], "andy is good at sports", ["andy has a good heart",
"andy is so nice"]); """
+ sql """ INSERT INTO ${tableName} VALUES (1, "bason", 11, [79, 85, 97],
["singing", "dancing"], "bason is good at singing", ["bason is very clever",
"bason is very healthy"]); """
+ sql """ INSERT INTO ${tableName} VALUES (2, "andy", 10, [89, 80, 98],
["football", "basketball"], "andy is good at sports", ["andy has a good heart",
"andy is so nice"]); """
+ sql """ INSERT INTO ${tableName} VALUES (2, "bason", 11, [79, 85, 97],
["singing", "dancing"], "bason is good at singing", ["bason is very clever",
"bason is very healthy"]); """
+ sql """ INSERT INTO ${tableName} VALUES (3, "andy", 10, [89, 80, 98],
["football", "basketball"], "andy is good at sports", ["andy has a good heart",
"andy is so nice"]); """
+ sql """ INSERT INTO ${tableName} VALUES (3, "bason", 11, [79, 85, 97],
["singing", "dancing"], "bason is good at singing", ["bason is very clever",
"bason is very healthy"]); """
+ }
+
+ def check_count = { String tableName, int count ->
+ def result = sql "SELECT COUNT(*) FROM ${tableName}"
+ assertEquals(count, result[0][0])
+ }
+
+ def run_select = { String tableName, boolean normal ->
+ def result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
name MATCH 'andy'"
+ assertEquals(3, result.size())
+ assertEquals(1, result[0]['id'])
+ assertEquals("andy", result[0]['name'])
+ assertEquals(2, result[1]['id'])
+ assertEquals("andy", result[1]['name'])
+ assertEquals(3, result[2]['id'])
+ assertEquals("andy", result[2]['name'])
+
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE age <
11"
+ assertEquals(3, result.size())
+ assertEquals("andy", result[0]['name'])
+ assertEquals(2, result[1]['id'])
+ assertEquals("andy", result[1]['name'])
+ assertEquals(3, result[2]['id'])
+ assertEquals("andy", result[2]['name'])
+
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
description MATCH 'sports'"
+ assertEquals(3, result.size())
+ assertEquals("andy", result[0]['name'])
+ assertEquals("andy is good at sports", result[0]['description'])
+ assertEquals("andy", result[1]['name'])
+ assertEquals("andy is good at sports", result[1]['description'])
+ assertEquals("andy", result[2]['name'])
+ assertEquals("andy is good at sports", result[2]['description'])
+
+ if (normal) {
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
array_contains(scores, 79)"
+ assertEquals(3, result.size())
+ assertEquals("bason", result[0]['name'])
+ assertEquals("[79, 85, 97]", result[0]['scores'])
+ assertEquals("bason", result[1]['name'])
+ assertEquals("[79, 85, 97]", result[1]['scores'])
+ assertEquals("bason", result[2]['name'])
+ assertEquals("[79, 85, 97]", result[2]['scores'])
+
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
array_contains(hobbies, 'football')"
+ assertEquals(3, result.size())
+ assertEquals("andy", result[0]['name'])
+ assertEquals('["football", "basketball"]', result[0]['hobbies'])
+ assertEquals("andy", result[1]['name'])
+ assertEquals('["football", "basketball"]', result[1]['hobbies'])
+ assertEquals("andy", result[2]['name'])
+ assertEquals('["football", "basketball"]', result[2]['hobbies'])
+
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
array_contains(evaluation, 'andy is so nice')"
+ assertEquals(3, result.size())
+ assertEquals("andy", result[0]['name'])
+ assertEquals('["andy has a good heart", "andy is so nice"]',
result[0]['evaluation'])
+ assertEquals("andy", result[1]['name'])
+ assertEquals('["andy has a good heart", "andy is so nice"]',
result[1]['evaluation'])
+ assertEquals("andy", result[2]['name'])
+ assertEquals('["andy has a good heart", "andy is so nice"]',
result[2]['evaluation'])
+ } else {
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
array_contains(scores, 79)"
+ assertEquals(0, result.size())
+
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
array_contains(hobbies, 'football')"
+ assertEquals(0, result.size())
+
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
array_contains(evaluation, 'andy is so nice')"
+ assertEquals(0, result.size())
+ }
+ }
+
+ // define debug points array
+ def debug_points = [
+ "inverted_index_parser.get_parser_stopwords_from_properties",
+ "CharFilterFactory::create_return_nullptr",
+ "InvertedIndexFileWriter::open_local_fs_exists_error",
+ "InvertedIndexFileWriter::open_local_fs_exists_true",
+ "InvertedIndexFileWriter::delete_index_index_meta_nullptr",
+ "InvertedIndexFileWriter::delete_index_indices_dirs_reach_end",
+ "InvertedIndexFileWriter::copyFile_openInput_error",
+ "InvertedIndexFileWriter::copyFile_remainder_is_not_zero",
+ "InvertedIndexFileWriter::copyFile_diff_not_equals_length",
+ "InvertedIndexFileWriter::write_v1_ram_output_is_nullptr",
+ "InvertedIndexFileWriter::write_v1_out_dir_createOutput_nullptr",
+ "FSIndexInput::~SharedHandle_reader_close_error",
+ "DorisFSDirectory::FSIndexInput::readInternal_reader_read_at_error",
+ "DorisFSDirectory::FSIndexInput::readInternal_bytes_read_error",
+
"DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init",
+
"DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor",
+
"DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer",
+ "DorisFSDirectory::FSIndexOutput::flushBuffer_writer_is_nullptr",
+ "DorisFSDirectory::FSIndexOutput::flushBuffer_b_is_nullptr",
+ "DorisFSDirectory::FSIndexOutput.set_writer_nullptr",
+ "DorisFSDirectory::FSIndexOutput._set_writer_close_status_error",
+
"DorisFSDirectory::FSIndexOutputV2::flushBuffer_file_writer_is_nullptr",
+ "DorisFSDirectory::FSIndexOutputV2::flushBuffer_b_is_nullptr",
+
"DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close",
+ "DorisFSDirectory::list_status_is_not_ok",
+ "DorisFSDirectory::list_directory_not_exists",
+ "DorisFSDirectory::fileExists_status_is_not_ok",
+ "DorisFSDirectory::touchFile_status_is_not_ok",
+ "DorisFSDirectory::fileLength_status_is_not_ok",
+ //"DorisFSDirectory::close_close_with_error", // will block the
process, off now
+ "DorisFSDirectory::doDeleteFile_status_is_not_ok",
+ "DorisFSDirectory::deleteDirectory_throw_is_not_directory",
+ "DorisFSDirectory::renameFile_exists_status_is_not_ok",
+ "DorisFSDirectory::renameFile_delete_status_is_not_ok",
+ "DorisFSDirectory::renameFile_rename_status_is_not_ok",
+ "DorisFSDirectory::createOutput_exists_status_is_not_ok",
+ "DorisFSDirectory::createOutput_delete_status_is_not_ok",
+ "DorisFSDirectory::createOutput_exists_after_delete_status_is_not_ok",
+ "DorisFSDirectory::createOutput_exists_after_delete_error",
+ "DorisRAMFSDirectory::fileModified_file_not_found",
+ "DorisRAMFSDirectory::touchFile_file_not_found",
+ "DorisRAMFSDirectory::fileLength_file_not_found",
+ "DorisRAMFSDirectory::openInput_file_not_found",
+ "DorisRAMFSDirectory::close_close_with_error",
+ "DorisRAMFSDirectory::createOutput_itr_filesMap_end",
+ "DorisFSDirectoryFactory::getDirectory_file_is_nullptr",
+ "DorisFSDirectoryFactory::getDirectory_exists_status_is_not_ok",
+
"DorisFSDirectoryFactory::getDirectory_create_directory_status_is_not_ok",
+ "InvertedIndexColumnWriter::init_field_type_not_supported",
+ "InvertedIndexColumnWriter::init_inverted_index_writer_init_error",
+ "InvertedIndexColumnWriter::close_on_error_throw_exception",
+ "InvertedIndexColumnWriter::init_bkd_index_throw_error",
+ "InvertedIndexColumnWriter::create_chinese_analyzer_throw_error",
+ "InvertedIndexColumnWriter::open_index_directory_error",
+
"InvertedIndexColumnWriter::create_index_writer_setRAMBufferSizeMB_error",
+
"InvertedIndexColumnWriter::create_index_writer_setMaxBufferedDocs_error",
+ "InvertedIndexColumnWriter::create_index_writer_setMergeFactor_error",
+ "InvertedIndexColumnWriterImpl::add_document_throw_error",
+ "InvertedIndexColumnWriterImpl::add_null_document_throw_error",
+ "InvertedIndexColumnWriterImpl::add_nulls_field_nullptr",
+ "InvertedIndexColumnWriterImpl::add_nulls_index_writer_nullptr",
+
"InvertedIndexColumnWriterImpl::new_char_token_stream__char_string_reader_init_error",
+ "InvertedIndexColumnWriterImpl::add_values_field_is_nullptr",
+ "InvertedIndexColumnWriterImpl::add_values_index_writer_is_nullptr",
+ "InvertedIndexColumnWriterImpl::add_array_values_count_is_zero",
+
"InvertedIndexColumnWriterImpl::add_array_values_index_writer_is_nullptr",
+ "InvertedIndexColumnWriterImpl::add_array_values_create_field_error",
+ "InvertedIndexColumnWriterImpl::add_array_values_create_field_error_2",
+ "InvertedIndexColumnWriterImpl::add_array_values_field_is_nullptr",
+ "InvertedIndexWriter._throw_clucene_error_in_fulltext_writer_close",
+ "InvertedIndexColumnWriter::create_array_typeinfo_is_nullptr",
+ "InvertedIndexColumnWriter::create_unsupported_type_for_inverted_index"
+ ]
+
+ def inverted_index_storage_format = ["v1", "v2"]
+
+ try {
+ String backend_id;
+ backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+
+ logger.info("Show config: code=" + code + ", out=" + out + ", err=" +
err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == "inverted_index_ram_dir_enable") {
+ invertedIndexCompactionEnable =
Boolean.parseBoolean(((List<String>) ele)[2])
+ logger.info("inverted_index_ram_dir_enable: ${((List<String>)
ele)[2]}")
+ }
+ }
+ set_be_config.call("inverted_index_ram_dir_enable", "false")
+ has_update_be_config = true
+ // check updated config
+ check_config.call("inverted_index_ram_dir_enable", "false");
+ inverted_index_storage_format.each { format ->
+ def tableName = "${tableNamePrefix}_${format}"
+ creata_table("${tableName}", format)
+
+ // for each debug point, enable it, run the insert, check the
count, and disable the debug point
+ // catch any exceptions and disable the debug point
+ debug_points.each { debug_point ->
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(debug_point)
+ run_insert("${tableName}")
+ check_count("${tableName}", 6)
+ // if debug_point equals
InvertedIndexColumnWriterImpl::add_array_values_count_is_zero, run_select(false)
+ // else run_select(true)
+ if (debug_point ==
"InvertedIndexColumnWriterImpl::add_array_values_count_is_zero") {
+ run_select("${tableName}", false)
+ } else {
+ run_select("${tableName}", true)
+ }
+ sql "TRUNCATE TABLE ${tableName}"
+ } catch (Exception e) {
+ log.error("Caught exception: ${e}")
+ check_count("${tableName}", 0)
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(debug_point)
+ }
+ }
+ }
+ } finally {
+ if (has_update_be_config) {
+ set_be_config.call("inverted_index_ram_dir_enable",
inverted_index_ram_dir_enable.toString())
+ }
+ sql "SET global enable_match_without_inverted_index = true"
+ }
+
+}
diff --git
a/regression-test/suites/fault_injection_p2/test_build_index_exception_fault_injection.groovy
b/regression-test/suites/fault_injection_p2/test_build_index_exception_fault_injection.groovy
new file mode 100644
index 00000000000..94cf4d6c15c
--- /dev/null
+++
b/regression-test/suites/fault_injection_p2/test_build_index_exception_fault_injection.groovy
@@ -0,0 +1,263 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_build_index_exception_fault_injection", "nonConcurrent,p2") {
+ if (isCloudMode()) {
+ return
+ }
+ def tableNamePrefix = "test_build_index_exception_fault_injection"
+
+ def changed_variables = sql "show variables where Changed = 1"
+ logger.info("changed variables: " + changed_variables.toString())
+ // sql "UNSET GLOBAL VARIABLE ALL;"
+ sql "SET global enable_match_without_inverted_index = false"
+
+ // prepare test table
+ def timeout = 60000
+ def delta_time = 1000
+ def alter_res = "null"
+ def useTime = 0
+
+ def wait_for_latest_op_on_table_finish = { table_name, OpTimeout ->
+ for(int t = delta_time; t <= OpTimeout; t += delta_time){
+ alter_res = sql """SHOW ALTER TABLE COLUMN WHERE TableName =
"${table_name}" ORDER BY CreateTime DESC LIMIT 1;"""
+ alter_res = alter_res.toString()
+ if(alter_res.contains("FINISHED")) {
+ sleep(3000) // wait change table state to normal
+ logger.info(table_name + " latest alter job finished, detail:
" + alter_res)
+ break
+ }
+ useTime = t
+ sleep(delta_time)
+ }
+ assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish
timeout")
+ }
+
+ def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout ->
+ for(int t = delta_time; t <= OpTimeout; t += delta_time){
+ alter_res = sql """SHOW BUILD INDEX WHERE TableName =
"${table_name}" ORDER BY JobId """
+
+ if (alter_res.size() > 0) {
+ def last_job_state = alter_res[alter_res.size()-1][7];
+ if (last_job_state == "FINISHED" || last_job_state ==
"CANCELLED") {
+ logger.info(table_name + " last index job finished, state:
" + last_job_state + ", detail: " + alter_res[alter_res.size()-1])
+ return last_job_state;
+ }
+ }
+ useTime = t
+ sleep(delta_time)
+ }
+ logger.info("wait_for_last_build_index_on_table_finish debug: " +
alter_res)
+ assertTrue(useTime <= OpTimeout,
"wait_for_last_build_index_on_table_finish timeout")
+ return "wait_timeout"
+ }
+
+ def creata_table = { String tableName, String format ->
+ sql "DROP TABLE IF EXISTS ${tableName}"
+
+ sql """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `age` int(11) NULL,
+ `scores` array<int> NULL,
+ `hobbies` array<text> NULL,
+ `description` text NULL,
+ `evaluation` array<text> NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "inverted_index_storage_format" = "${format}",
+ "disable_auto_compaction" = "true"
+ );
+ """
+ }
+
+ def create_index = { String tableName ->
+ sql "CREATE INDEX idx_name ON ${tableName} (name) USING INVERTED"
+ wait_for_latest_op_on_table_finish("${tableName}", timeout)
+ sql "CREATE INDEX idx_age ON ${tableName} (age) USING INVERTED"
+ wait_for_latest_op_on_table_finish("${tableName}", timeout)
+ sql "CREATE INDEX idx_scores ON ${tableName} (scores) USING INVERTED"
+ wait_for_latest_op_on_table_finish("${tableName}", timeout)
+ sql "CREATE INDEX idx_hobbies ON ${tableName} (hobbies) USING INVERTED"
+ wait_for_latest_op_on_table_finish("${tableName}", timeout)
+ sql "CREATE INDEX idx_description ON ${tableName} (description) USING
INVERTED properties(\"parser\" = \"english\")"
+ wait_for_latest_op_on_table_finish("${tableName}", timeout)
+ sql "CREATE INDEX idx_evaluation ON ${tableName} (evaluation) USING
INVERTED"
+ wait_for_latest_op_on_table_finish("${tableName}", timeout)
+ }
+
+ def build_index = { String tableName ->
+ sql "BUILD INDEX idx_name ON ${tableName}"
+ wait_for_last_build_index_on_table_finish("${tableName}", timeout)
+ sql "BUILD INDEX idx_age ON ${tableName}"
+ wait_for_last_build_index_on_table_finish("${tableName}", timeout)
+ sql "BUILD INDEX idx_scores ON ${tableName}"
+ wait_for_last_build_index_on_table_finish("${tableName}", timeout)
+ sql "BUILD INDEX idx_hobbies ON ${tableName}"
+ wait_for_last_build_index_on_table_finish("${tableName}", timeout)
+ sql "BUILD INDEX idx_description ON ${tableName}"
+ wait_for_last_build_index_on_table_finish("${tableName}", timeout)
+ sql "BUILD INDEX idx_evaluation ON ${tableName}"
+ wait_for_last_build_index_on_table_finish("${tableName}", timeout)
+ }
+
+ def run_insert = { String tableName ->
+ sql """ INSERT INTO ${tableName} VALUES (1, "andy", 10, [89, 80, 98],
["football", "basketball"], "andy is good at sports", ["andy has a good heart",
"andy is so nice"]); """
+ sql """ INSERT INTO ${tableName} VALUES (1, "bason", 11, [79, 85, 97],
["singing", "dancing"], "bason is good at singing", ["bason is very clever",
"bason is very healthy"]); """
+ sql """ INSERT INTO ${tableName} VALUES (2, "andy", 10, [89, 80, 98],
["football", "basketball"], "andy is good at sports", ["andy has a good heart",
"andy is so nice"]); """
+ sql """ INSERT INTO ${tableName} VALUES (2, "bason", 11, [79, 85, 97],
["singing", "dancing"], "bason is good at singing", ["bason is very clever",
"bason is very healthy"]); """
+ sql """ INSERT INTO ${tableName} VALUES (3, "andy", 10, [89, 80, 98],
["football", "basketball"], "andy is good at sports", ["andy has a good heart",
"andy is so nice"]); """
+ sql """ INSERT INTO ${tableName} VALUES (3, "bason", 11, [79, 85, 97],
["singing", "dancing"], "bason is good at singing", ["bason is very clever",
"bason is very healthy"]); """
+ }
+
+ def check_count = { String tableName, int count ->
+ def result = sql "SELECT COUNT(*) FROM ${tableName}"
+ assertEquals(count, result[0][0])
+ }
+
+ def run_select = { String tableName, boolean normal ->
+ def result
+
+ if (normal) {
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
name MATCH 'andy'"
+ assertEquals(3, result.size())
+ assertEquals(1, result[0]['id'])
+ assertEquals("andy", result[0]['name'])
+ assertEquals(2, result[1]['id'])
+ assertEquals("andy", result[1]['name'])
+ assertEquals(3, result[2]['id'])
+ assertEquals("andy", result[2]['name'])
+
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
description MATCH 'sports'"
+ assertEquals(3, result.size())
+ assertEquals("andy", result[0]['name'])
+ assertEquals("andy is good at sports", result[0]['description'])
+ assertEquals("andy", result[1]['name'])
+ assertEquals("andy is good at sports", result[1]['description'])
+ assertEquals("andy", result[2]['name'])
+ assertEquals("andy is good at sports", result[2]['description'])
+ } else {
+ try {
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
name MATCH 'andy'"
+ assertTrue(0, result.size())
+ } catch (Exception e) {
+ log.error("Caught exception: ${e}")
+ assertContains(e.toString(), "[E-6001]match_any not support
execute_match")
+ }
+ try {
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
description MATCH 'sports'"
+ assertTrue(0, result.size())
+ } catch (Exception e) {
+ log.error("Caught exception: ${e}")
+ assertContains(e.toString(), "[E-6001]match_any not support
execute_match")
+ }
+ }
+
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE age <
11"
+ assertEquals(3, result.size())
+ assertEquals("andy", result[0]['name'])
+ assertEquals(2, result[1]['id'])
+ assertEquals("andy", result[1]['name'])
+ assertEquals(3, result[2]['id'])
+ assertEquals("andy", result[2]['name'])
+
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
array_contains(scores, 79)"
+ assertEquals(3, result.size())
+ assertEquals("bason", result[0]['name'])
+ assertEquals("[79, 85, 97]", result[0]['scores'])
+ assertEquals("bason", result[1]['name'])
+ assertEquals("[79, 85, 97]", result[1]['scores'])
+ assertEquals("bason", result[2]['name'])
+ assertEquals("[79, 85, 97]", result[2]['scores'])
+
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
array_contains(hobbies, 'football')"
+ assertEquals(3, result.size())
+ assertEquals("andy", result[0]['name'])
+ assertEquals('["football", "basketball"]', result[0]['hobbies'])
+ assertEquals("andy", result[1]['name'])
+ assertEquals('["football", "basketball"]', result[1]['hobbies'])
+ assertEquals("andy", result[2]['name'])
+ assertEquals('["football", "basketball"]', result[2]['hobbies'])
+
+ result = sql_return_maparray "SELECT * FROM ${tableName} WHERE
array_contains(evaluation, 'andy is so nice')"
+ assertEquals(3, result.size())
+ assertEquals("andy", result[0]['name'])
+ assertEquals('["andy has a good heart", "andy is so nice"]',
result[0]['evaluation'])
+ assertEquals("andy", result[1]['name'])
+ assertEquals('["andy has a good heart", "andy is so nice"]',
result[1]['evaluation'])
+ assertEquals("andy", result[2]['name'])
+ assertEquals('["andy has a good heart", "andy is so nice"]',
result[2]['evaluation'])
+ }
+
+ // define debug points array
+ def debug_points = [
+ "IndexBuilder::update_inverted_index_info_is_local_rowset",
+ "IndexBuilder::update_inverted_index_info_size_st_not_ok",
+
"IndexBuilder::update_inverted_index_info_index_file_reader_init_not_ok",
+ "IndexBuilder::handle_single_rowset_is_local_rowset",
+ "IndexBuilder::handle_single_rowset_can_not_find_reader_drop_op",
+ "IndexBuilder::handle_single_rowset_can_not_find_reader",
+ "IndexBuilder::handle_single_rowset_support_inverted_index",
+ "IndexBuilder::handle_single_rowset_index_column_writer_create_error",
+ "IndexBuilder::handle_single_rowset_create_iterator_error",
+ "IndexBuilder::handle_single_rowset",
+ "IndexBuilder::handle_single_rowset_iterator_next_batch_error",
+ "IndexBuilder::handle_single_rowset_write_inverted_index_data_error",
+ "IndexBuilder::handle_single_rowset_index_build_finish_error",
+ "IndexBuilder::handle_single_rowset_file_writer_close_error",
+ // "IndexBuilder::_write_inverted_index_data_column_idx_is_negative"
// skip build index
+ "IndexBuilder::_write_inverted_index_data_convert_column_data_error",
+ "IndexBuilder::_add_nullable_add_array_values_error",
+ "IndexBuilder::_add_nullable_throw_exception",
+ "IndexBuilder::_add_data_throw_exception",
+ "IndexBuilder::do_build_inverted_index_alter_inverted_indexes_empty",
+ "IndexBuilder::do_build_inverted_index_modify_rowsets_status_error",
+ "IndexBuilder::gc_output_rowset_is_local_rowset"
+ ]
+
+ def inverted_index_storage_format = ["v1", "v2"]
+ inverted_index_storage_format.each { format ->
+ def tableName = "${tableNamePrefix}_${format}"
+
+ // for each debug point, enable it, run the insert, check the count,
and disable the debug point
+ // catch any exceptions and disable the debug point
+ debug_points.each { debug_point ->
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(debug_point)
+ creata_table("${tableName}", format)
+ run_insert("${tableName}")
+ create_index("${tableName}")
+ build_index("${tableName}")
+ check_count("${tableName}", 6)
+ run_select("${tableName}", false)
+ } catch (Exception e) {
+ log.error("Caught exception: ${e}")
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(debug_point)
+ }
+ }
+ }
+
+ sql "SET global enable_match_without_inverted_index = true"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]