This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new b765f8ab458 [fix](build index) Fix inverted index hardlink leak and
missing problem #26903 (#27244)
b765f8ab458 is described below
commit b765f8ab458bde41118f97656e20fbf4bb2a7d3d
Author: Kang <[email protected]>
AuthorDate: Sun Nov 19 09:15:16 2023 -0600
[fix](build index) Fix inverted index hardlink leak and missing problem
#26903 (#27244)
---
be/src/olap/rowset/beta_rowset.cpp | 52 +++--
.../fault_injection_p0/test_build_index_fault.out | 19 ++
.../test_build_index_fault.groovy | 224 +++++++++++++++++++++
3 files changed, 282 insertions(+), 13 deletions(-)
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index c635f48df4b..176ff0d21b5 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -42,6 +42,7 @@
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
+#include "util/debug_points.h"
#include "util/doris_metrics.h"
namespace doris {
@@ -232,21 +233,40 @@ Status BetaRowset::link_files_to(const std::string& dir,
RowsetId new_rowset_id,
if (fs->type() != io::FileSystemType::LOCAL) {
return Status::InternalError("should be local file system");
}
+
+ Status status;
+ std::vector<string> linked_success_files;
+ Defer remove_linked_files {[&]() { // clear linked files if errors happen
+ if (!status.ok()) {
+ LOG(WARNING) << "will delete linked success files due to error "
<< status;
+ std::vector<io::Path> paths;
+ for (auto& file : linked_success_files) {
+ paths.emplace_back(file);
+ LOG(WARNING) << "will delete linked success file " << file <<
" due to error";
+ }
+ static_cast<void>(fs->batch_delete(paths));
+ LOG(WARNING) << "done delete linked success files due to error "
<< status;
+ }
+ }};
+
io::LocalFileSystem* local_fs = (io::LocalFileSystem*)fs.get();
for (int i = 0; i < num_segments(); ++i) {
auto dst_path = segment_file_path(dir, new_rowset_id, i +
new_rowset_start_seg_id);
bool dst_path_exist = false;
if (!fs->exists(dst_path, &dst_path_exist).ok() || dst_path_exist) {
- return Status::Error<FILE_ALREADY_EXIST>(
+ status = Status::Error<FILE_ALREADY_EXIST>(
"failed to create hard link, file already exist: {}",
dst_path);
+ return status;
}
auto src_path = segment_file_path(i);
// TODO(lingbin): how external storage support link?
// use copy? or keep refcount to avoid being delete?
if (!local_fs->link_file(src_path, dst_path).ok()) {
- return Status::Error<OS_ERROR>("fail to create hard link. from={},
to={}, errno={}",
- src_path, dst_path, Errno::no());
+ status = Status::Error<OS_ERROR>("fail to create hard link.
from={}, to={}, errno={}",
+ src_path, dst_path, Errno::no());
+ return status;
}
+ linked_success_files.push_back(dst_path);
for (auto& index : _schema->indexes()) {
if (index.index_type() != IndexType::INVERTED) {
continue;
@@ -260,25 +280,31 @@ Status BetaRowset::link_files_to(const std::string& dir,
RowsetId new_rowset_id,
InvertedIndexDescriptor::get_index_file_name(src_path,
index_id);
std::string inverted_index_dst_file_path =
InvertedIndexDescriptor::get_index_file_name(dst_path,
index_id);
- bool need_to_link = true;
- if (_schema->skip_write_index_on_load()) {
- local_fs->exists(inverted_index_src_file_path, &need_to_link);
- if (!need_to_link) {
- LOG(INFO) << "skip create hard link to not existed file="
- << inverted_index_src_file_path;
- }
- }
- if (need_to_link) {
+ bool index_file_exists = true;
+ RETURN_IF_ERROR(local_fs->exists(inverted_index_src_file_path,
&index_file_exists));
+ if (index_file_exists) {
+ DBUG_EXECUTE_IF(
+
"fault_inject::BetaRowset::link_files_to::_link_inverted_index_file", {
+ status = Status::Error<OS_ERROR>(
+ "fault_inject link_file error from={},
to={}",
+ inverted_index_src_file_path,
inverted_index_dst_file_path);
+ return status;
+ });
if (!local_fs->link_file(inverted_index_src_file_path,
inverted_index_dst_file_path)
.ok()) {
- return Status::Error<OS_ERROR>(
+ status = Status::Error<OS_ERROR>(
"fail to create hard link. from={}, to={},
errno={}",
inverted_index_src_file_path,
inverted_index_dst_file_path,
Errno::no());
+ return status;
}
+ linked_success_files.push_back(inverted_index_dst_file_path);
LOG(INFO) << "success to create hard link. from=" <<
inverted_index_src_file_path
<< ", "
<< "to=" << inverted_index_dst_file_path;
+ } else {
+ LOG(WARNING) << "skip create hard link to not existed index
file="
+ << inverted_index_src_file_path;
}
}
}
diff --git a/regression-test/data/fault_injection_p0/test_build_index_fault.out
b/regression-test/data/fault_injection_p0/test_build_index_fault.out
new file mode 100644
index 00000000000..543ca7ae5ff
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/test_build_index_fault.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !count1 --
+1000000
+
+-- !count2 --
+1000000
+
+-- !count3 --
+1000000
+
+-- !count4 --
+1000000
+
+-- !count5 --
+1000000
+
+-- !count6 --
+1000000
+
diff --git
a/regression-test/suites/fault_injection_p0/test_build_index_fault.groovy
b/regression-test/suites/fault_injection_p0/test_build_index_fault.groovy
new file mode 100644
index 00000000000..3c8e7bc57f4
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_build_index_fault.groovy
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+suite("test_build_index_fault", "inverted_index"){
+ // prepare test table
+ def timeout = 60000
+ def delta_time = 1000
+ def alter_res = "null"
+ def useTime = 0
+
+ def wait_for_latest_op_on_table_finish = { table_name, OpTimeout ->
+ for(int t = delta_time; t <= OpTimeout; t += delta_time){
+ alter_res = sql """SHOW ALTER TABLE COLUMN WHERE TableName =
"${table_name}" ORDER BY CreateTime DESC LIMIT 1;"""
+ alter_res = alter_res.toString()
+ if(alter_res.contains("FINISHED")) {
+ sleep(3000) // wait change table state to normal
+ logger.info(table_name + " latest alter job finished, detail:
" + alter_res)
+ break
+ }
+ useTime = t
+ sleep(delta_time)
+ }
+ assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish
timeout")
+ }
+
+ def wait_for_build_index_on_partition_finish = { table_name, OpTimeout ->
+ for(int t = delta_time; t <= OpTimeout; t += delta_time){
+ alter_res = sql """SHOW BUILD INDEX WHERE TableName =
"${table_name}";"""
+ def expected_finished_num = alter_res.size();
+ def finished_num = 0;
+ for (int i = 0; i < expected_finished_num; i++) {
+ logger.info(table_name + " build index job state: " +
alter_res[i][7] + i)
+ if (alter_res[i][7] == "FINISHED") {
+ ++finished_num;
+ }
+ }
+ if (finished_num == expected_finished_num) {
+ logger.info(table_name + " all build index jobs finished,
detail: " + alter_res)
+ break
+ }
+ useTime = t
+ sleep(delta_time)
+ }
+ assertTrue(useTime <= OpTimeout,
"wait_for_latest_build_index_on_partition_finish timeout")
+ }
+
+ def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout ->
+ for(int t = delta_time; t <= OpTimeout; t += delta_time){
+ alter_res = sql """SHOW BUILD INDEX WHERE TableName =
"${table_name}" ORDER BY JobId """
+
+ def last_job_state = alter_res[alter_res.size()-1][7];
+ if (last_job_state == "FINISHED" || last_job_state == "CANCELLED")
{
+ logger.info(table_name + " last index job finished, state: " +
last_job_state + ", detail: " + alter_res)
+ return last_job_state;
+ }
+ useTime = t
+ sleep(delta_time)
+ }
+ assertTrue(useTime <= OpTimeout,
"wait_for_last_build_index_on_table_finish timeout")
+ return "wait_timeout"
+ }
+
+ def wait_for_last_build_index_on_table_running = { table_name, OpTimeout ->
+ for(int t = delta_time; t <= OpTimeout; t += delta_time){
+ alter_res = sql """SHOW BUILD INDEX WHERE TableName =
"${table_name}" ORDER BY JobId """
+
+ def last_job_state = alter_res[alter_res.size()-1][7];
+ if (last_job_state == "RUNNING") {
+ logger.info(table_name + " last index job running, state: " +
last_job_state + ", detail: " + alter_res)
+ return last_job_state;
+ }
+ useTime = t
+ sleep(delta_time)
+ }
+ assertTrue(useTime <= OpTimeout,
"wait_for_last_build_index_on_table_finish timeout")
+ return "wait_timeout"
+ }
+
+ def tableName = "hackernews_1m"
+
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ // create 1 replica table
+ sql """
+ CREATE TABLE ${tableName} (
+ `id` bigint(20) NULL,
+ `deleted` tinyint(4) NULL,
+ `type` text NULL,
+ `author` text NULL,
+ `timestamp` datetime NULL,
+ `comment` text NULL,
+ `dead` tinyint(4) NULL,
+ `parent` bigint(20) NULL,
+ `poll` bigint(20) NULL,
+ `children` array<bigint(20)> NULL,
+ `url` text NULL,
+ `score` int(11) NULL,
+ `title` text NULL,
+ `parts` array<int(11)> NULL,
+ `descendants` int(11) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "is_being_synced" = "false",
+ "storage_format" = "V2",
+ "light_schema_change" = "true",
+ "disable_auto_compaction" = "false",
+ "enable_single_replica_compaction" = "false"
+ );
+ """
+
+ // stream load data
+ streamLoad {
+ table "${tableName}"
+
+ set 'compress_type', 'GZ'
+
+ file """${getS3Url()}/regression/index/hacknernews_1m.csv.gz"""
+
+ time 60000 // limit inflight 60s
+
+ // stream load action will check result, include Success status, and
NumberTotalRows == NumberLoadedRows
+
+ // if declared a check callback, the default check condition will
ignore.
+ // So you must check all condition
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+
+ sql "sync"
+
+ // check data
+ qt_count1 """ SELECT COUNT() from ${tableName}; """
+
+ // ADD INDEX
+ sql """ ALTER TABLE ${tableName} ADD INDEX idx_comment (`comment`) USING
INVERTED PROPERTIES("parser" = "english") """
+
+ wait_for_latest_op_on_table_finish(tableName, timeout)
+
+ // BUILD INDEX and expect state is RUNNING
+ sql """ BUILD INDEX idx_comment ON ${tableName} """
+ def state = wait_for_last_build_index_on_table_running(tableName, timeout)
+ def result = sql """ SHOW BUILD INDEX WHERE TableName = "${tableName}"
ORDER BY JobId """
+ assertEquals(result[result.size()-1][1], tableName)
+ assertTrue(result[result.size()-1][3].contains("ADD INDEX"))
+ assertEquals(result[result.size()-1][7], "RUNNING")
+
+ // CANCEL BUILD INDEX and expect state is CANCELED
+ sql """ CANCEL BUILD INDEX ON ${tableName} (${result[result.size()-1][0]})
"""
+ result = sql """ SHOW BUILD INDEX WHERE TableName = "${tableName}" ORDER
BY JobId """
+ assertEquals(result[result.size()-1][1], tableName)
+ assertTrue(result[result.size()-1][3].contains("ADD INDEX"))
+ assertEquals(result[result.size()-1][7], "CANCELLED")
+ assertEquals(result[result.size()-1][8], "user cancelled")
+ // check data
+ qt_count2 """ SELECT COUNT() from ${tableName}; """
+
+ // BUILD INDEX and expect state is FINISHED
+ sql """ BUILD INDEX idx_comment ON ${tableName}; """
+ state = wait_for_last_build_index_on_table_finish(tableName, timeout)
+ assertEquals(state, "FINISHED")
+ // check data
+ qt_count3 """ SELECT COUNT() from ${tableName}; """
+
+ // CANCEL BUILD INDEX in FINISHED state and expect exception
+ def success = false;
+ try {
+ sql """ CANCEL BUILD INDEX ON ${tableName}; """
+ success = true
+ } catch(Exception ex) {
+ logger.info(" CANCEL BUILD INDEX ON ${tableName} exception: " + ex)
+ }
+ assertFalse(success)
+
+ // BUILD INDEX again and expect state is FINISHED
+ sql """ BUILD INDEX idx_comment ON ${tableName}; """
+ state = wait_for_last_build_index_on_table_finish(tableName, timeout)
+ assertEquals(state, "FINISHED")
+ // check data
+ qt_count4 """ SELECT COUNT() from ${tableName}; """
+
+ // BUILD INDEX with error injection
+ sql """ ALTER TABLE ${tableName} ADD INDEX idx_title (`title`) USING
INVERTED """
+ // enable error_inject for BetaRowset link inverted index file and expect
state is RUNNGING
+
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::BetaRowset::link_files_to::_link_inverted_index_file")
+ sql """ BUILD INDEX idx_title ON ${tableName}; """
+ state = wait_for_last_build_index_on_table_finish(tableName, timeout)
+ assertEquals(state, "wait_timeout")
+ // check data
+ qt_count5 """ SELECT COUNT() from ${tableName}; """
+
+ // disable error_inject for BetaRowset link inverted index file and expect
state is FINISHED
+
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::BetaRowset::link_files_to::_link_inverted_index_file")
+ // timeout * 10 for possible fe schedule delay
+ state = wait_for_last_build_index_on_table_finish(tableName, timeout * 10)
+ assertEquals(state, "FINISHED")
+ // check data
+ qt_count6 """ SELECT COUNT() from ${tableName}; """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]