This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 9f2d88b4867 [fix](clone) Fix engine_clone file exist (#27361) (#27536)
9f2d88b4867 is described below

commit 9f2d88b4867231309be67ee93cde56d99763fe5c
Author: Jack Drogon <[email protected]>
AuthorDate: Fri Nov 24 17:43:09 2023 +0800

    [fix](clone) Fix engine_clone file exist (#27361) (#27536)
---
 be/src/common/status.h                 |  2 +
 be/src/olap/task/engine_clone_task.cpp | 79 +++++++++++++++++++++++++++++++---
 2 files changed, 74 insertions(+), 7 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index b1164237130..88981fe808b 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -595,6 +595,8 @@ inline std::string Status::to_string() const {
 template <typename T>
 using Result = expected<T, Status>;
 
+using ResultError = unexpected<Status>;
+
 #define RETURN_IF_ERROR_RESULT(stmt)                \
     do {                                            \
         Status _status_ = (stmt);                   \
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index 3621a958eff..86ba6ac2b9f 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -74,6 +74,58 @@ using strings::SkipWhitespace;
 namespace doris {
 using namespace ErrorCode;
 
+namespace {
+/// if binlog file exist, then check if binlog file md5sum equal
+/// if equal, then skip link file
+/// if not equal, then return error
+/// return value: if binlog file not exist, then return to binlog file path
+Result<std::string> check_dest_binlog_valid(const std::string& tablet_dir,
+                                            const std::string& clone_file, 
bool* skip_link_file) {
+    // change clone_file suffix .binlog to .dat
+    std::string new_clone_file = clone_file;
+    new_clone_file.replace(clone_file.size() - 7, 7, ".dat");
+    auto to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file);
+
+    // check to to file exist
+    bool exists = true;
+    auto status = io::global_local_filesystem()->exists(to, &exists);
+    if (!status.ok()) {
+        return ResultError(std::move(status));
+    }
+
+    if (!exists) {
+        return to;
+    }
+
+    LOG(WARNING) << "binlog file already exist. "
+                 << "tablet_dir=" << tablet_dir << ", clone_file=" << 
clone_file;
+
+    std::string clone_file_md5sum;
+    status = io::global_local_filesystem()->md5sum(clone_file, 
&clone_file_md5sum);
+    if (!status.ok()) {
+        return ResultError(std::move(status));
+    }
+    std::string to_file_md5sum;
+    status = io::global_local_filesystem()->md5sum(to, &to_file_md5sum);
+    if (!status.ok()) {
+        return ResultError(std::move(status));
+    }
+
+    if (clone_file_md5sum == to_file_md5sum) {
+        // if md5sum equal, then skip link file
+        *skip_link_file = true;
+        return to;
+    } else {
+        auto err_msg = fmt::format(
+                "binlog file already exist, but md5sum not equal. "
+                "tablet_dir={}, clone_file={}",
+                tablet_dir, clone_file);
+        LOG(WARNING) << err_msg;
+        return ResultError(Status::InternalError(std::move(err_msg)));
+    }
+}
+} // namespace
+
 #define RETURN_IF_ERROR_(status, stmt) \
     do {                               \
         status = (stmt);               \
@@ -603,6 +655,8 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const 
std::string& clone_d
     /// Traverse all downloaded clone files in CLONE dir.
     /// If it does not exist in local tablet dir, link the file to local 
tablet dir
     /// And save all linked files in linked_success_files.
+    /// if binlog exist in clone dir and md5sum equal, then skip link file
+    bool skip_link_file = false;
     for (const string& clone_file : clone_file_names) {
         if (local_file_names.find(clone_file) != local_file_names.end()) {
             VLOG_NOTICE << "find same file when clone, skip it. "
@@ -619,19 +673,30 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, 
const std::string& clone_d
                 break;
             }
 
-            // change clone_file suffix .binlog to .dat
-            std::string new_clone_file = clone_file;
-            new_clone_file.replace(clone_file.size() - 7, 7, ".dat");
-            to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file);
+            if (auto&& result = check_dest_binlog_valid(tablet_dir, 
clone_file, &skip_link_file);
+                result) {
+                to = std::move(result.value());
+            } else {
+                status = std::move(result.error());
+                return status;
+            }
         } else {
             to = fmt::format("{}/{}", tablet_dir, clone_file);
         }
 
-        RETURN_IF_ERROR(io::global_local_filesystem()->link_file(from, to));
-        linked_success_files.emplace_back(std::move(to));
+        if (!skip_link_file) {
+            status = io::global_local_filesystem()->link_file(from, to);
+            if (!status.ok()) {
+                return status;
+            }
+            linked_success_files.emplace_back(std::move(to));
+        }
     }
     if (contain_binlog) {
-        RETURN_IF_ERROR(tablet->ingest_binlog_metas(&rowset_binlog_metas_pb));
+        status = tablet->ingest_binlog_metas(&rowset_binlog_metas_pb);
+        if (!status.ok()) {
+            return status;
+        }
     }
 
     // clone and compaction operation should be performed sequentially


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to