This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new b765f8ab458 [fix](build index) Fix inverted index hardlink leak and 
missing problem #26903 (#27244)
b765f8ab458 is described below

commit b765f8ab458bde41118f97656e20fbf4bb2a7d3d
Author: Kang <[email protected]>
AuthorDate: Sun Nov 19 09:15:16 2023 -0600

    [fix](build index) Fix inverted index hardlink leak and missing problem 
#26903 (#27244)
---
 be/src/olap/rowset/beta_rowset.cpp                 |  52 +++--
 .../fault_injection_p0/test_build_index_fault.out  |  19 ++
 .../test_build_index_fault.groovy                  | 224 +++++++++++++++++++++
 3 files changed, 282 insertions(+), 13 deletions(-)

diff --git a/be/src/olap/rowset/beta_rowset.cpp 
b/be/src/olap/rowset/beta_rowset.cpp
index c635f48df4b..176ff0d21b5 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -42,6 +42,7 @@
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
 #include "olap/tablet_schema.h"
 #include "olap/utils.h"
+#include "util/debug_points.h"
 #include "util/doris_metrics.h"
 
 namespace doris {
@@ -232,21 +233,40 @@ Status BetaRowset::link_files_to(const std::string& dir, 
RowsetId new_rowset_id,
     if (fs->type() != io::FileSystemType::LOCAL) {
         return Status::InternalError("should be local file system");
     }
+
+    Status status;
+    std::vector<string> linked_success_files;
+    Defer remove_linked_files {[&]() { // clear linked files if errors happen
+        if (!status.ok()) {
+            LOG(WARNING) << "will delete linked success files due to error " 
<< status;
+            std::vector<io::Path> paths;
+            for (auto& file : linked_success_files) {
+                paths.emplace_back(file);
+                LOG(WARNING) << "will delete linked success file " << file << 
" due to error";
+            }
+            static_cast<void>(fs->batch_delete(paths));
+            LOG(WARNING) << "done delete linked success files due to error " 
<< status;
+        }
+    }};
+
     io::LocalFileSystem* local_fs = (io::LocalFileSystem*)fs.get();
     for (int i = 0; i < num_segments(); ++i) {
         auto dst_path = segment_file_path(dir, new_rowset_id, i + 
new_rowset_start_seg_id);
         bool dst_path_exist = false;
         if (!fs->exists(dst_path, &dst_path_exist).ok() || dst_path_exist) {
-            return Status::Error<FILE_ALREADY_EXIST>(
+            status = Status::Error<FILE_ALREADY_EXIST>(
                     "failed to create hard link, file already exist: {}", 
dst_path);
+            return status;
         }
         auto src_path = segment_file_path(i);
         // TODO(lingbin): how external storage support link?
         //     use copy? or keep refcount to avoid being delete?
         if (!local_fs->link_file(src_path, dst_path).ok()) {
-            return Status::Error<OS_ERROR>("fail to create hard link. from={}, 
to={}, errno={}",
-                                           src_path, dst_path, Errno::no());
+            status = Status::Error<OS_ERROR>("fail to create hard link. 
from={}, to={}, errno={}",
+                                             src_path, dst_path, Errno::no());
+            return status;
         }
+        linked_success_files.push_back(dst_path);
         for (auto& index : _schema->indexes()) {
             if (index.index_type() != IndexType::INVERTED) {
                 continue;
@@ -260,25 +280,31 @@ Status BetaRowset::link_files_to(const std::string& dir, 
RowsetId new_rowset_id,
                     InvertedIndexDescriptor::get_index_file_name(src_path, 
index_id);
             std::string inverted_index_dst_file_path =
                     InvertedIndexDescriptor::get_index_file_name(dst_path, 
index_id);
-            bool need_to_link = true;
-            if (_schema->skip_write_index_on_load()) {
-                local_fs->exists(inverted_index_src_file_path, &need_to_link);
-                if (!need_to_link) {
-                    LOG(INFO) << "skip create hard link to not existed file="
-                              << inverted_index_src_file_path;
-                }
-            }
-            if (need_to_link) {
+            bool index_file_exists = true;
+            RETURN_IF_ERROR(local_fs->exists(inverted_index_src_file_path, 
&index_file_exists));
+            if (index_file_exists) {
+                DBUG_EXECUTE_IF(
+                        
"fault_inject::BetaRowset::link_files_to::_link_inverted_index_file", {
+                            status = Status::Error<OS_ERROR>(
+                                    "fault_inject link_file error from={}, 
to={}",
+                                    inverted_index_src_file_path, 
inverted_index_dst_file_path);
+                            return status;
+                        });
                 if (!local_fs->link_file(inverted_index_src_file_path, 
inverted_index_dst_file_path)
                              .ok()) {
-                    return Status::Error<OS_ERROR>(
+                    status = Status::Error<OS_ERROR>(
                             "fail to create hard link. from={}, to={}, 
errno={}",
                             inverted_index_src_file_path, 
inverted_index_dst_file_path,
                             Errno::no());
+                    return status;
                 }
+                linked_success_files.push_back(inverted_index_dst_file_path);
                 LOG(INFO) << "success to create hard link. from=" << 
inverted_index_src_file_path
                           << ", "
                           << "to=" << inverted_index_dst_file_path;
+            } else {
+                LOG(WARNING) << "skip create hard link to not existed index 
file="
+                             << inverted_index_src_file_path;
             }
         }
     }
diff --git a/regression-test/data/fault_injection_p0/test_build_index_fault.out 
b/regression-test/data/fault_injection_p0/test_build_index_fault.out
new file mode 100644
index 00000000000..543ca7ae5ff
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/test_build_index_fault.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !count1 --
+1000000
+
+-- !count2 --
+1000000
+
+-- !count3 --
+1000000
+
+-- !count4 --
+1000000
+
+-- !count5 --
+1000000
+
+-- !count6 --
+1000000
+
diff --git 
a/regression-test/suites/fault_injection_p0/test_build_index_fault.groovy 
b/regression-test/suites/fault_injection_p0/test_build_index_fault.groovy
new file mode 100644
index 00000000000..3c8e7bc57f4
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_build_index_fault.groovy
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+suite("test_build_index_fault", "inverted_index"){
+    // prepare test table
+    def timeout = 60000
+    def delta_time = 1000
+    def alter_res = "null"
+    def useTime = 0
+    
+    def wait_for_latest_op_on_table_finish = { table_name, OpTimeout ->
+        for(int t = delta_time; t <= OpTimeout; t += delta_time){
+            alter_res = sql """SHOW ALTER TABLE COLUMN WHERE TableName = 
"${table_name}" ORDER BY CreateTime DESC LIMIT 1;"""
+            alter_res = alter_res.toString()
+            if(alter_res.contains("FINISHED")) {
+                sleep(3000) // wait change table state to normal
+                logger.info(table_name + " latest alter job finished, detail: 
" + alter_res)
+                break
+            }
+            useTime = t
+            sleep(delta_time)
+        }
+        assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish 
timeout")
+    }
+
+    def wait_for_build_index_on_partition_finish = { table_name, OpTimeout ->
+        for(int t = delta_time; t <= OpTimeout; t += delta_time){
+            alter_res = sql """SHOW BUILD INDEX WHERE TableName = 
"${table_name}";"""
+            def expected_finished_num = alter_res.size();
+            def finished_num = 0;
+            for (int i = 0; i < expected_finished_num; i++) {
+                logger.info(table_name + " build index job state: " + 
alter_res[i][7] + i)
+                if (alter_res[i][7] == "FINISHED") {
+                    ++finished_num;
+                }
+            }
+            if (finished_num == expected_finished_num) {
+                logger.info(table_name + " all build index jobs finished, 
detail: " + alter_res)
+                break
+            }
+            useTime = t
+            sleep(delta_time)
+        }
+        assertTrue(useTime <= OpTimeout, 
"wait_for_latest_build_index_on_partition_finish timeout")
+    }
+
+    def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout ->
+        for(int t = delta_time; t <= OpTimeout; t += delta_time){
+            alter_res = sql """SHOW BUILD INDEX WHERE TableName = 
"${table_name}" ORDER BY JobId """
+
+            def last_job_state = alter_res[alter_res.size()-1][7];
+            if (last_job_state == "FINISHED" || last_job_state == "CANCELLED") 
{
+                logger.info(table_name + " last index job finished, state: " + 
last_job_state + ", detail: " + alter_res)
+                return last_job_state;
+            }
+            useTime = t
+            sleep(delta_time)
+        }
+        assertTrue(useTime <= OpTimeout, 
"wait_for_last_build_index_on_table_finish timeout")
+        return "wait_timeout"
+    }
+
+    def wait_for_last_build_index_on_table_running = { table_name, OpTimeout ->
+        for(int t = delta_time; t <= OpTimeout; t += delta_time){
+            alter_res = sql """SHOW BUILD INDEX WHERE TableName = 
"${table_name}" ORDER BY JobId """
+
+            def last_job_state = alter_res[alter_res.size()-1][7];
+            if (last_job_state == "RUNNING") {
+                logger.info(table_name + " last index job running, state: " + 
last_job_state + ", detail: " + alter_res)
+                return last_job_state;
+            }
+            useTime = t
+            sleep(delta_time)
+        }
+        assertTrue(useTime <= OpTimeout, 
"wait_for_last_build_index_on_table_finish timeout")
+        return "wait_timeout"
+    }
+
+    def tableName = "hackernews_1m"
+
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    // create 1 replica table
+    sql """
+        CREATE TABLE ${tableName} (
+            `id` bigint(20) NULL,
+            `deleted` tinyint(4) NULL,
+            `type` text NULL,
+            `author` text NULL,
+            `timestamp` datetime NULL,
+            `comment` text NULL,
+            `dead` tinyint(4) NULL,
+            `parent` bigint(20) NULL,
+            `poll` bigint(20) NULL,
+            `children` array<bigint(20)> NULL,
+            `url` text NULL,
+            `score` int(11) NULL,
+            `title` text NULL,
+            `parts` array<int(11)> NULL,
+            `descendants` int(11) NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`id`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`id`) BUCKETS 10
+            PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1",
+            "is_being_synced" = "false",
+            "storage_format" = "V2",
+            "light_schema_change" = "true",
+            "disable_auto_compaction" = "false",
+            "enable_single_replica_compaction" = "false"
+        );
+    """
+
+    // stream load data
+    streamLoad {
+        table "${tableName}"
+
+        set 'compress_type', 'GZ'
+
+        file """${getS3Url()}/regression/index/hacknernews_1m.csv.gz"""
+
+        time 60000 // limit inflight 60s
+
+        // stream load action will check result, include Success status, and 
NumberTotalRows == NumberLoadedRows
+
+        // if declared a check callback, the default check condition will 
ignore.
+        // So you must check all condition
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+            assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+        }
+    }
+
+    sql "sync"
+
+    // check data
+    qt_count1 """ SELECT COUNT() from ${tableName}; """
+
+    // ADD INDEX
+    sql """ ALTER TABLE ${tableName} ADD INDEX idx_comment (`comment`) USING 
INVERTED PROPERTIES("parser" = "english") """
+
+    wait_for_latest_op_on_table_finish(tableName, timeout)
+
+    // BUILD INDEX and expect state is RUNNING
+    sql """ BUILD INDEX idx_comment ON ${tableName} """
+    def state = wait_for_last_build_index_on_table_running(tableName, timeout)
+    def result = sql """ SHOW BUILD INDEX WHERE TableName = "${tableName}" 
ORDER BY JobId """
+    assertEquals(result[result.size()-1][1], tableName)
+    assertTrue(result[result.size()-1][3].contains("ADD INDEX"))
+    assertEquals(result[result.size()-1][7], "RUNNING")
+
+    // CANCEL BUILD INDEX and expect state is CANCELED
+    sql """ CANCEL BUILD INDEX ON ${tableName} (${result[result.size()-1][0]}) 
"""
+    result = sql """ SHOW BUILD INDEX WHERE TableName = "${tableName}" ORDER 
BY JobId """
+    assertEquals(result[result.size()-1][1], tableName)
+    assertTrue(result[result.size()-1][3].contains("ADD INDEX"))
+    assertEquals(result[result.size()-1][7], "CANCELLED")
+    assertEquals(result[result.size()-1][8], "user cancelled")
+    // check data
+    qt_count2 """ SELECT COUNT() from ${tableName}; """
+
+    // BUILD INDEX and expect state is FINISHED
+    sql """ BUILD INDEX idx_comment ON ${tableName}; """
+    state = wait_for_last_build_index_on_table_finish(tableName, timeout)
+    assertEquals(state, "FINISHED")
+    // check data
+    qt_count3 """ SELECT COUNT() from ${tableName}; """
+
+    // CANCEL BUILD INDEX in FINISHED state and expect exception
+    def success = false;
+    try {
+        sql """ CANCEL BUILD INDEX ON ${tableName}; """
+        success = true
+    } catch(Exception ex) {
+        logger.info(" CANCEL BUILD INDEX ON ${tableName} exception: " + ex)
+    }
+    assertFalse(success)
+
+    // BUILD INDEX again and expect state is FINISHED
+    sql """ BUILD INDEX idx_comment ON ${tableName}; """
+    state = wait_for_last_build_index_on_table_finish(tableName, timeout)
+    assertEquals(state, "FINISHED")
+    // check data
+    qt_count4 """ SELECT COUNT() from ${tableName}; """
+
+    // BUILD INDEX with error injection
+    sql """ ALTER TABLE ${tableName} ADD INDEX idx_title (`title`) USING 
INVERTED """
+    // enable error_inject for BetaRowset link inverted index file and expect 
state is RUNNGING
+    
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::BetaRowset::link_files_to::_link_inverted_index_file")
+    sql """ BUILD INDEX idx_title ON ${tableName}; """
+    state = wait_for_last_build_index_on_table_finish(tableName, timeout)
+    assertEquals(state, "wait_timeout")
+    // check data
+    qt_count5 """ SELECT COUNT() from ${tableName}; """
+
+    // disable error_inject for BetaRowset link inverted index file and expect 
state is FINISHED
+    
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::BetaRowset::link_files_to::_link_inverted_index_file")
+    // timeout * 10 for possible fe schedule delay
+    state = wait_for_last_build_index_on_table_finish(tableName, timeout * 10)
+    assertEquals(state, "FINISHED")
+    // check data
+    qt_count6 """ SELECT COUNT() from ${tableName}; """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to