This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 394dd6af3a5 branch-3.1: [fix](inverted index) fix fs set when meet
segment corruption #58317 (#58391)
394dd6af3a5 is described below
commit 394dd6af3a5c177d2185c2e577bb054bcd09cf8c
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 2 11:00:46 2025 +0800
branch-3.1: [fix](inverted index) fix fs set when meet segment corruption
#58317 (#58391)
Cherry-picked from #58317
---------
Co-authored-by: Jack <[email protected]>
---
.../segment_v2/inverted_index_file_reader.cpp | 5 +-
be/src/olap/rowset/segment_v2/segment.cpp | 3 +
.../rowset/segment_v2/segment_corruption_test.cpp | 361 +++++++++++++++++++++
run-be-ut.sh | 5 +
4 files changed, 373 insertions(+), 1 deletion(-)
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
index 9a0fcd43e99..da8be0f0765 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
@@ -60,6 +60,8 @@ Status InvertedIndexFileReader::_init_from(int32_t
read_buffer_size, const io::I
}
})
+ DCHECK(_fs != nullptr) << "file system is nullptr,
index_file_full_path: "
+ << index_file_full_path;
// 2. open file
auto ok = DorisFSDirectory::FSIndexInput::open(
_fs, index_file_full_path.c_str(), index_input, err,
read_buffer_size, file_size);
@@ -177,7 +179,8 @@ Result<std::unique_ptr<DorisCompoundReader>>
InvertedIndexFileReader::_open(
"CLuceneError occur file size = -1, file is {}",
index_file_path));
}
})
-
+ DCHECK(_fs != nullptr)
+ << "file system is nullptr, index_file_path: " <<
index_file_path;
// 2. open file
auto ok = DorisFSDirectory::FSIndexInput::open(
_fs, index_file_path.c_str(), index_input, err,
_read_buffer_size, file_size);
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index 4a90d2966a7..01818705015 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -124,6 +124,7 @@ Status Segment::_open(io::FileSystemSPtr fs, const
std::string& path, uint32_t s
st = fs->open_file(path, &file_reader, &reader_options);
if (st) {
+ segment->_fs = fs;
segment->_file_reader = std::move(file_reader);
st = segment->_open(stats);
}
@@ -138,6 +139,7 @@ Status Segment::_open(io::FileSystemSPtr fs, const
std::string& path, uint32_t s
io::FileReaderOptions opt = reader_options;
opt.cache_type = io::FileCachePolicy::NO_CACHE; // skip cache
RETURN_IF_ERROR(fs->open_file(path, &file_reader, &opt));
+ segment->_fs = fs;
segment->_file_reader = std::move(file_reader);
st = segment->_open(stats);
if (!st.ok()) {
@@ -148,6 +150,7 @@ Status Segment::_open(io::FileSystemSPtr fs, const
std::string& path, uint32_t s
}
}
RETURN_IF_ERROR(st);
+ DCHECK(segment->_fs != nullptr) << "file system is nullptr after segment
open";
*output = std::move(segment);
return Status::OK();
}
diff --git a/be/test/olap/rowset/segment_v2/segment_corruption_test.cpp
b/be/test/olap/rowset/segment_v2/segment_corruption_test.cpp
new file mode 100644
index 00000000000..5c7ef6f9ea9
--- /dev/null
+++ b/be/test/olap/rowset/segment_v2/segment_corruption_test.cpp
@@ -0,0 +1,361 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <filesystem>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include "common/status.h"
+#include "cpp/sync_point.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
+#include "io/fs/local_file_system.h"
+#include "olap/field.h"
+#include "olap/row_cursor.h"
+#include "olap/rowset/rowset_id_generator.h"
+#include "olap/rowset/segment_v2/inverted_index_cache.h"
+#include "olap/rowset/segment_v2/inverted_index_desc.h"
+#include "olap/rowset/segment_v2/inverted_index_file_reader.h"
+#include "olap/rowset/segment_v2/inverted_index_file_writer.h"
+#include "olap/rowset/segment_v2/inverted_index_reader.h"
+#include "olap/rowset/segment_v2/inverted_index_writer.h"
+#include "olap/rowset/segment_v2/segment.h"
+#include "olap/rowset/segment_v2/segment_writer.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_schema.h"
+#include "olap/tablet_schema_helper.h"
+#include "runtime/exec_env.h"
+
+namespace doris::segment_v2 {
+
+class SegmentCorruptionTest : public testing::Test {
+public:
+ static constexpr const char* kTestDir = "./ut_dir/segment_corruption_test";
+ static constexpr const char* kCacheDir =
"./ut_dir/segment_corruption_test_cache";
+ static constexpr const std::string_view tmp_dir =
"./ut_dir/segment_corruption_test/tmp";
+
+ static void SetUpTestSuite() {
+ // Initialize FileCacheFactory if not already done
+ if (ExecEnv::GetInstance()->file_cache_factory() == nullptr) {
+ _suite_factory = std::make_unique<io::FileCacheFactory>();
+ ExecEnv::GetInstance()->_file_cache_factory = _suite_factory.get();
+ }
+ std::vector<StorePath> paths;
+ paths.emplace_back(config::storage_root_path, -1);
+
+
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok());
+
EXPECT_TRUE(io::global_local_filesystem()->create_directory(tmp_dir).ok());
+ paths.emplace_back(std::string(tmp_dir), 1024000000);
+ auto tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(paths);
+ EXPECT_TRUE(tmp_file_dirs->init().ok());
+ ExecEnv::GetInstance()->set_tmp_file_dir(std::move(tmp_file_dirs));
+
+ // Initialize InvertedIndexSearcherCache
+ if (ExecEnv::GetInstance()->get_inverted_index_searcher_cache() ==
nullptr) {
+ int64_t inverted_index_cache_limit = 1024 * 1024 * 1024;
+ _inverted_index_searcher_cache =
std::unique_ptr<InvertedIndexSearcherCache>(
+
InvertedIndexSearcherCache::create_global_instance(inverted_index_cache_limit,
+ 1));
+ ExecEnv::GetInstance()->set_inverted_index_searcher_cache(
+ _inverted_index_searcher_cache.get());
+ }
+ }
+
+ static void TearDownTestSuite() {
+ // Disable sync point processing before clearing caches to prevent
+ // background threads from accessing SyncPoint during/after destruction
+ SyncPoint::get_instance()->disable_processing();
+
+ if (ExecEnv::GetInstance()->file_cache_factory() != nullptr) {
+ io::FileCacheFactory::instance()->clear_file_caches(true);
+ }
+
+ // Give background threads time to stop after cache destruction
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+ _suite_factory.reset(nullptr);
+ _inverted_index_searcher_cache.reset(nullptr);
+ }
+
+ void SetUp() override {
+ auto st = io::global_local_filesystem()->delete_directory(kTestDir);
+ ASSERT_TRUE(st.ok() || st.is<ErrorCode::NOT_FOUND>()) << st;
+ st = io::global_local_filesystem()->create_directory(kTestDir);
+ ASSERT_TRUE(st.ok()) << st;
+
+ st = io::global_local_filesystem()->delete_directory(kCacheDir);
+ ASSERT_TRUE(st.ok() || st.is<ErrorCode::NOT_FOUND>()) << st;
+ st = io::global_local_filesystem()->create_directory(kCacheDir);
+ ASSERT_TRUE(st.ok()) << st;
+
+ doris::EngineOptions options;
+ _engine = std::make_unique<StorageEngine>(options);
+ _data_dir = std::make_unique<DataDir>(*_engine, kTestDir);
+ ASSERT_TRUE(_data_dir->update_capacity().ok());
+
+ // Setup file cache
+ io::FileCacheSettings settings;
+ settings.storage = "memory";
+ settings.capacity = 1024 * 1024; // 1MB
+ settings.max_file_block_size = 64 * 1024; // 64KB
+ auto cache = std::make_unique<io::BlockFileCache>(kCacheDir, settings);
+ ASSERT_TRUE(cache->initialize());
+ for (int i = 0; i < 1000; ++i) {
+ if (cache->get_async_open_success()) break;
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ auto* raw = cache.get();
+ auto* factory = io::FileCacheFactory::instance();
+ factory->_caches.emplace_back(std::move(cache));
+ factory->_path_to_cache[kCacheDir] = raw;
+ }
+
+ void TearDown() override {
+ // Disable sync point processing before clearing caches to prevent
+ // background threads from accessing SyncPoint during/after destruction
+ SyncPoint::get_instance()->disable_processing();
+
+ if (ExecEnv::GetInstance()->file_cache_factory() != nullptr) {
+ io::FileCacheFactory::instance()->clear_file_caches(true);
+ }
+
+ // Give background threads time to stop after cache destruction
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+ auto st = io::global_local_filesystem()->delete_directory(kTestDir);
+ ASSERT_TRUE(st.ok()) << st;
+ st = io::global_local_filesystem()->delete_directory(kCacheDir);
+ ASSERT_TRUE(st.ok() || st.is<ErrorCode::NOT_FOUND>()) << st;
+ }
+
+ // Create schema with inverted index
+ TabletSchemaSPtr create_schema_with_inverted_index() {
+ TabletSchemaSPtr schema = std::make_shared<TabletSchema>();
+
+ // Add INT key column
+ TabletColumn column_1;
+ column_1.set_name("c1");
+ column_1.set_unique_id(0);
+ column_1.set_type(FieldType::OLAP_FIELD_TYPE_INT);
+ column_1.set_length(4);
+ column_1.set_index_length(4);
+ column_1.set_is_key(true);
+ column_1.set_is_nullable(false);
+ schema->append_column(column_1);
+
+ // Add VARCHAR column with inverted index
+ TabletColumn column_2;
+ column_2.set_name("c2");
+ column_2.set_unique_id(1);
+ column_2.set_type(FieldType::OLAP_FIELD_TYPE_VARCHAR);
+ column_2.set_length(65535);
+ column_2.set_is_key(false);
+ column_2.set_is_nullable(false);
+ schema->append_column(column_2);
+
+ // Add inverted index on c2
+ TabletIndex index;
+ index._index_id = 1;
+ index._index_name = "idx_c2";
+ index._index_type = IndexType::INVERTED;
+ index._col_unique_ids.push_back(1);
+ schema->append_index(std::move(index));
+
+ schema->_keys_type = DUP_KEYS;
+ schema->_inverted_index_storage_format =
InvertedIndexStorageFormatPB::V2;
+
+ return schema;
+ }
+
+ std::string create_segment_with_inverted_index(TabletSchemaSPtr schema,
uint32_t segment_id,
+ const RowsetId& rowset_id) {
+ std::string filename = fmt::format("{}_{}.dat", rowset_id.to_string(),
segment_id);
+ std::string seg_path = fmt::format("{}/{}", kTestDir, filename);
+ auto fs = io::global_local_filesystem();
+
+ // Create segment file
+ io::FileWriterPtr file_writer;
+ auto st = fs->create_file(seg_path, &file_writer);
+ EXPECT_TRUE(st.ok()) << st.to_string();
+
+ // Create inverted index file (V2 format)
+ std::string index_path_prefix {
+ InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)};
+ std::string index_path =
InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix);
+
+ io::FileWriterPtr idx_file_writer;
+ st = fs->create_file(index_path, &idx_file_writer);
+ EXPECT_TRUE(st.ok()) << st.to_string();
+
+ auto index_file_writer = std::make_unique<InvertedIndexFileWriter>(
+ fs, index_path_prefix, rowset_id.to_string(), segment_id,
+ InvertedIndexStorageFormatPB::V2, std::move(idx_file_writer));
+
+ SegmentWriterOptions opts;
+ SegmentWriter writer(file_writer.get(), segment_id, schema, nullptr,
nullptr, opts,
+ index_file_writer.get());
+ st = writer.init();
+ EXPECT_TRUE(st.ok());
+
+ // Write rows
+ RowCursor row;
+ auto olap_st = row.init(schema);
+ EXPECT_EQ(Status::OK(), olap_st);
+
+ // Write one row: (1, "hello")
+ {
+ RowCursorCell cell0 = row.cell(0);
+ *(int32_t*)cell0.mutable_cell_ptr() = 1;
+ cell0.set_not_null();
+
+ RowCursorCell cell1 = row.cell(1);
+ Slice value("hello");
+ reinterpret_cast<Slice*>(cell1.mutable_cell_ptr())->data =
value.data;
+ reinterpret_cast<Slice*>(cell1.mutable_cell_ptr())->size =
value.size;
+ cell1.set_not_null();
+
+ st = writer.append_row(row);
+ EXPECT_TRUE(st.ok());
+ }
+
+ uint64_t file_size, index_size;
+ st = writer.finalize(&file_size, &index_size);
+ EXPECT_TRUE(st.ok());
+ EXPECT_TRUE(file_writer->close().ok());
+
+ // Close the index file writer (it was already written by
SegmentWriter during finalize)
+ st = index_file_writer->close();
+ EXPECT_TRUE(st.ok()) << st;
+
+ return seg_path;
+ }
+
+protected:
+ std::unique_ptr<StorageEngine> _engine;
+ std::unique_ptr<DataDir> _data_dir;
+
+ inline static std::unique_ptr<io::FileCacheFactory> _suite_factory;
+ inline static std::unique_ptr<InvertedIndexSearcherCache>
_inverted_index_searcher_cache;
+};
+
+// Test that _fs is correctly set when opening segment through CORRUPTION
retry path.
+// This test:
+// 1. Creates a segment with inverted index
+// 2. Injects CORRUPTION error on first open attempt
+// 3. Opens segment successfully after retry
+// 4. Tries to use inverted index (which requires _fs)
+// If _fs is nullptr, the inverted index operation will crash.
+TEST_F(SegmentCorruptionTest, TestFsSetInCorruptionRetryPath) {
+ auto schema = create_schema_with_inverted_index();
+ RowsetId rowset_id;
+ rowset_id.init(1);
+
+ auto path = create_segment_with_inverted_index(schema, 0, rowset_id);
+ auto fs = io::global_local_filesystem();
+
+ // Enable sync point for testing - inject CORRUPTION error on first open
attempt
+ auto* sp = SyncPoint::get_instance();
+ sp->enable_processing();
+
+ int corruption_count = 0;
+ SyncPoint::CallbackGuard guard;
+ sp->set_call_back(
+ "Segment::open:corruption",
+ [&corruption_count](auto&& args) {
+ // Only trigger corruption on first attempt to simulate file
cache corruption
+ if (corruption_count == 0) {
+ auto* st = try_any_cast<Status*>(args[0]);
+ *st = Status::Corruption<false>("test corruption
injection");
+ corruption_count++;
+ }
+ },
+ &guard);
+
+ std::shared_ptr<Segment> segment;
+ // Use FILE_BLOCK_CACHE to trigger the corruption retry path
+ io::FileReaderOptions reader_options;
+ reader_options.cache_type = io::FileCachePolicy::FILE_BLOCK_CACHE;
+
+ auto st = Segment::open(fs, path, /*tablet_id=*/100, /*segment_id=*/0,
rowset_id, schema,
+ reader_options, &segment);
+
+ sp->disable_processing();
+
+ // Verify that corruption was injected and retry happened
+ ASSERT_EQ(corruption_count, 1) << "Corruption should have been injected
once";
+
+ // The segment should open successfully after retry
+ ASSERT_TRUE(st.ok()) << st.to_string();
+ ASSERT_NE(segment, nullptr);
+
+ // Now try to use inverted index - this will trigger
_open_index_file_reader()
+ // which uses _fs. If _fs is nullptr, this will crash.
+ auto indexes = schema->inverted_indexs(schema->column(1));
+ ASSERT_FALSE(indexes.empty());
+ const TabletIndex* idx_meta = indexes[0];
+
+ OlapReaderStatistics stats;
+ StorageReadOptions read_options;
+ read_options.stats = &stats;
+ std::unique_ptr<InvertedIndexIterator> iter;
+ // This call triggers _open_index_file_reader() -> uses _fs
+ // If _fs is nullptr (bug not fixed), this will crash
+ st = segment->new_inverted_index_iterator(schema->column(1), idx_meta,
read_options, &iter);
+ st =
segment->_inverted_index_file_reader->init(config::inverted_index_read_buffer_size,
+ &read_options.io_ctx);
+ ASSERT_TRUE(st.ok()) << st.to_string();
+ // The call may fail due to missing index data in this simple test,
+ // but the key point is it should NOT crash due to nullptr _fs
+ // If we reach here, _fs was correctly set
+}
+
+// Test normal segment open path
+TEST_F(SegmentCorruptionTest, TestFsSetInNormalPath) {
+ auto schema = create_schema_with_inverted_index();
+ RowsetId rowset_id;
+ rowset_id.init(2);
+
+ auto path = create_segment_with_inverted_index(schema, 0, rowset_id);
+ auto fs = io::global_local_filesystem();
+
+ std::shared_ptr<Segment> segment;
+ io::FileReaderOptions reader_options;
+
+ auto st = Segment::open(fs, path, /*tablet_id=*/100, /*segment_id=*/0,
rowset_id, schema,
+ reader_options, &segment);
+
+ ASSERT_TRUE(st.ok()) << st.to_string();
+ ASSERT_NE(segment, nullptr);
+
+ // Use inverted index to verify _fs is set
+ auto indexes = schema->inverted_indexs(schema->column(1));
+ ASSERT_FALSE(indexes.empty());
+ const TabletIndex* idx_meta = indexes[0];
+
+ OlapReaderStatistics stats;
+ StorageReadOptions read_options;
+ read_options.stats = &stats;
+ std::unique_ptr<InvertedIndexIterator> iter;
+ st = segment->new_inverted_index_iterator(schema->column(1), idx_meta,
read_options, &iter);
+ // If we reach here without crash, _fs was correctly set
+}
+
+} // namespace doris::segment_v2
diff --git a/run-be-ut.sh b/run-be-ut.sh
index a8f7ae793ba..20245ae2555 100755
--- a/run-be-ut.sh
+++ b/run-be-ut.sh
@@ -237,6 +237,10 @@ if [[ -z "${USE_UNWIND}" ]]; then
fi
fi
+if [[ -z "${ENABLE_INJECTION_POINT}" ]]; then
+ ENABLE_INJECTION_POINT='ON'
+fi
+
MAKE_PROGRAM="$(command -v "${BUILD_SYSTEM}")"
echo "-- Make program: ${MAKE_PROGRAM}"
echo "-- Use ccache: ${CMAKE_USE_CCACHE}"
@@ -265,6 +269,7 @@ cd "${CMAKE_BUILD_DIR}"
-DUSE_JEMALLOC=OFF \
-DEXTRA_CXX_FLAGS="${EXTRA_CXX_FLAGS}" \
-DENABLE_CLANG_COVERAGE="${DENABLE_CLANG_COVERAGE}" \
+ -DENABLE_INJECTION_POINT="${ENABLE_INJECTION_POINT}" \
${CMAKE_USE_CCACHE:+${CMAKE_USE_CCACHE}} \
-DENABLE_PCH="${ENABLE_PCH}" \
-DDORIS_JAVA_HOME="${JAVA_HOME}" \
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]