This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new bf11d48171c [branch-2.0] Picks "[Fix](partial update) Persist 
partial_update_info in RocksDB in case of BE restart after a partial update has 
commited #38331" (#39078)
bf11d48171c is described below

commit bf11d48171cdea34920e23b31d5fe298efd3ba36
Author: bobhan1 <[email protected]>
AuthorDate: Fri Aug 9 21:45:45 2024 +0800

    [branch-2.0] Picks "[Fix](partial update) Persist partial_update_info in 
RocksDB in case of BE restart after a partial update has commited #38331" 
(#39078)
    
    picks https://github.com/apache/doris/pull/38331 and
    https://github.com/apache/doris/pull/39066
---
 be/src/olap/partial_update_info.cpp                | 142 +++++++++++++++++++
 be/src/olap/partial_update_info.h                  |  63 ++-------
 be/src/olap/rowset/rowset_meta_manager.cpp         |  95 +++++++++++++
 be/src/olap/rowset/rowset_meta_manager.h           |  18 +++
 be/src/olap/storage_engine.cpp                     |  31 ++++
 be/src/olap/storage_engine.h                       |   2 +
 be/src/olap/txn_manager.cpp                        |  73 +++++++++-
 be/src/olap/txn_manager.h                          |   8 +-
 gensrc/proto/olap_file.proto                       |  15 ++
 .../unique_with_mow_p0/partial_update/data1.csv    |   2 +
 .../test_partial_update_conflict_be_restart.out    |  21 +++
 .../test_partial_update_conflict_be_restart.groovy | 156 +++++++++++++++++++++
 12 files changed, 567 insertions(+), 59 deletions(-)

diff --git a/be/src/olap/partial_update_info.cpp 
b/be/src/olap/partial_update_info.cpp
new file mode 100644
index 00000000000..8bb5bb97ca4
--- /dev/null
+++ b/be/src/olap/partial_update_info.cpp
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/partial_update_info.h"
+
+#include <gen_cpp/olap_file.pb.h>
+
+#include "olap/tablet_schema.h"
+#include "util/string_util.h"
+
+namespace doris {
+
+void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool 
partial_update,
+                             const std::set<string>& partial_update_cols, bool 
is_strict_mode,
+                             int64_t timestamp_ms, const std::string& timezone,
+                             int64_t cur_max_version) {
+    is_partial_update = partial_update;
+    partial_update_input_columns = partial_update_cols;
+    max_version_in_flush_phase = cur_max_version;
+    this->timestamp_ms = timestamp_ms;
+    this->timezone = timezone;
+    missing_cids.clear();
+    update_cids.clear();
+    for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
+        auto tablet_column = tablet_schema.column(i);
+        if (!partial_update_input_columns.contains(tablet_column.name())) {
+            missing_cids.emplace_back(i);
+            if (!tablet_column.has_default_value() && 
!tablet_column.is_nullable()) {
+                can_insert_new_rows_in_partial_update = false;
+            }
+        } else {
+            update_cids.emplace_back(i);
+        }
+    }
+    this->is_strict_mode = is_strict_mode;
+    _generate_default_values_for_missing_cids(tablet_schema);
+}
+
+void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) 
const {
+    partial_update_info_pb->set_is_partial_update(is_partial_update);
+    
partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase);
+    for (const auto& col : partial_update_input_columns) {
+        partial_update_info_pb->add_partial_update_input_columns(col);
+    }
+    for (auto cid : missing_cids) {
+        partial_update_info_pb->add_missing_cids(cid);
+    }
+    for (auto cid : update_cids) {
+        partial_update_info_pb->add_update_cids(cid);
+    }
+    partial_update_info_pb->set_can_insert_new_rows_in_partial_update(
+            can_insert_new_rows_in_partial_update);
+    partial_update_info_pb->set_is_strict_mode(is_strict_mode);
+    partial_update_info_pb->set_timestamp_ms(timestamp_ms);
+    partial_update_info_pb->set_timezone(timezone);
+    for (const auto& value : default_values) {
+        partial_update_info_pb->add_default_values(value);
+    }
+}
+
+void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
+    is_partial_update = partial_update_info_pb->is_partial_update();
+    max_version_in_flush_phase = 
partial_update_info_pb->has_max_version_in_flush_phase()
+                                         ? 
partial_update_info_pb->max_version_in_flush_phase()
+                                         : -1;
+    partial_update_input_columns.clear();
+    for (const auto& col : 
partial_update_info_pb->partial_update_input_columns()) {
+        partial_update_input_columns.insert(col);
+    }
+    missing_cids.clear();
+    for (auto cid : partial_update_info_pb->missing_cids()) {
+        missing_cids.push_back(cid);
+    }
+    update_cids.clear();
+    for (auto cid : partial_update_info_pb->update_cids()) {
+        update_cids.push_back(cid);
+    }
+    can_insert_new_rows_in_partial_update =
+            partial_update_info_pb->can_insert_new_rows_in_partial_update();
+    is_strict_mode = partial_update_info_pb->is_strict_mode();
+    timestamp_ms = partial_update_info_pb->timestamp_ms();
+    timezone = partial_update_info_pb->timezone();
+    default_values.clear();
+    for (const auto& value : partial_update_info_pb->default_values()) {
+        default_values.push_back(value);
+    }
+}
+
+std::string PartialUpdateInfo::summary() const {
+    return fmt::format(
+            "update_cids={}, missing_cids={}, is_strict_mode={}, 
max_version_in_flush_phase={}",
+            update_cids.size(), missing_cids.size(), is_strict_mode, 
max_version_in_flush_phase);
+}
+
+void PartialUpdateInfo::_generate_default_values_for_missing_cids(
+        const TabletSchema& tablet_schema) {
+    for (unsigned int cur_cid : missing_cids) {
+        const auto& column = tablet_schema.column(cur_cid);
+        if (column.has_default_value()) {
+            std::string default_value;
+            if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
+                                 FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
+                         
to_lower(tablet_schema.column(cur_cid).default_value())
+                                         .find(to_lower("CURRENT_TIMESTAMP")) 
!=
+                                 std::string::npos)) {
+                vectorized::DateV2Value<vectorized::DateTimeV2ValueType> dtv;
+                dtv.from_unixtime(timestamp_ms / 1000, timezone);
+                default_value = dtv.debug_string();
+            } else if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
+                                        FieldType::OLAP_FIELD_TYPE_DATEV2 &&
+                                
to_lower(tablet_schema.column(cur_cid).default_value())
+                                                
.find(to_lower("CURRENT_DATE")) !=
+                                        std::string::npos)) {
+                vectorized::DateV2Value<vectorized::DateV2ValueType> dv;
+                dv.from_unixtime(timestamp_ms / 1000, timezone);
+                default_value = dv.debug_string();
+            } else {
+                default_value = tablet_schema.column(cur_cid).default_value();
+            }
+            default_values.emplace_back(default_value);
+        } else {
+            // place an empty string here
+            default_values.emplace_back();
+        }
+    }
+    CHECK_EQ(missing_cids.size(), default_values.size());
+}
+} // namespace doris
diff --git a/be/src/olap/partial_update_info.h 
b/be/src/olap/partial_update_info.h
index 46d921e6e60..fe85e8d19f5 100644
--- a/be/src/olap/partial_update_info.h
+++ b/be/src/olap/partial_update_info.h
@@ -16,64 +16,25 @@
 // under the License.
 
 #pragma once
-
-#include "olap/tablet_schema.h"
-#include "util/string_util.h"
+#include <cstdint>
+#include <set>
+#include <string>
+#include <vector>
 
 namespace doris {
+class TabletSchema;
+class PartialUpdateInfoPB;
 
 struct PartialUpdateInfo {
     void init(const TabletSchema& tablet_schema, bool partial_update,
-              const std::set<string>& partial_update_cols, bool is_strict_mode,
-              int64_t timestamp_ms, const std::string& timezone, int64_t 
cur_max_version = -1) {
-        is_partial_update = partial_update;
-        partial_update_input_columns = partial_update_cols;
-        max_version_in_flush_phase = cur_max_version;
-        this->timestamp_ms = timestamp_ms;
-        this->timezone = timezone;
-        missing_cids.clear();
-        update_cids.clear();
-        for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
-            auto tablet_column = tablet_schema.column(i);
-            if (!partial_update_input_columns.contains(tablet_column.name())) {
-                missing_cids.emplace_back(i);
-                if (!tablet_column.has_default_value() && 
!tablet_column.is_nullable()) {
-                    can_insert_new_rows_in_partial_update = false;
-                }
-            } else {
-                update_cids.emplace_back(i);
-            }
-        }
-        this->is_strict_mode = is_strict_mode;
-        _generate_default_values_for_missing_cids(tablet_schema);
-    }
+              const std::set<std::string>& partial_update_cols, bool 
is_strict_mode,
+              int64_t timestamp_ms, const std::string& timezone, int64_t 
cur_max_version = -1);
+    void to_pb(PartialUpdateInfoPB* partial_update_info) const;
+    void from_pb(PartialUpdateInfoPB* partial_update_info);
+    std::string summary() const;
 
 private:
-    void _generate_default_values_for_missing_cids(const TabletSchema& 
tablet_schema) {
-        for (auto i = 0; i < missing_cids.size(); ++i) {
-            auto cur_cid = missing_cids[i];
-            const auto& column = tablet_schema.column(cur_cid);
-            if (column.has_default_value()) {
-                std::string default_value;
-                if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
-                                     FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
-                             
to_lower(tablet_schema.column(cur_cid).default_value())
-                                             
.find(to_lower("CURRENT_TIMESTAMP")) !=
-                                     std::string::npos)) {
-                    vectorized::DateV2Value<vectorized::DateTimeV2ValueType> 
dtv;
-                    dtv.from_unixtime(timestamp_ms / 1000, timezone);
-                    default_value = dtv.debug_string();
-                } else {
-                    default_value = 
tablet_schema.column(cur_cid).default_value();
-                }
-                default_values.emplace_back(default_value);
-            } else {
-                // place an empty string here
-                default_values.emplace_back();
-            }
-        }
-        CHECK_EQ(missing_cids.size(), default_values.size());
-    }
+    void _generate_default_values_for_missing_cids(const TabletSchema& 
tablet_schema);
 
 public:
     bool is_partial_update {false};
diff --git a/be/src/olap/rowset/rowset_meta_manager.cpp 
b/be/src/olap/rowset/rowset_meta_manager.cpp
index f5dc8101ea0..db0ad8f3d45 100644
--- a/be/src/olap/rowset/rowset_meta_manager.cpp
+++ b/be/src/olap/rowset/rowset_meta_manager.cpp
@@ -39,6 +39,7 @@
 namespace doris {
 namespace {
 const std::string ROWSET_PREFIX = "rst_";
+const std::string PARTIAL_UPDATE_INFO_PREFIX = "pui_";
 } // namespace
 
 using namespace ErrorCode;
@@ -535,4 +536,98 @@ Status RowsetMetaManager::load_json_rowset_meta(OlapMeta* 
meta,
     return status;
 }
 
+Status RowsetMetaManager::save_partial_update_info(
+        OlapMeta* meta, int64_t tablet_id, int64_t partition_id, int64_t 
txn_id,
+        const PartialUpdateInfoPB& partial_update_info_pb) {
+    std::string key =
+            fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, 
partition_id, txn_id);
+    std::string value;
+    if (!partial_update_info_pb.SerializeToString(&value)) {
+        return Status::Error<SERIALIZE_PROTOBUF_ERROR>(
+                "serialize partial update info failed. key={}", key);
+    }
+    VLOG_NOTICE << "save partial update info, key=" << key << ", value_size=" 
<< value.size();
+    return meta->put(META_COLUMN_FAMILY_INDEX, key, value);
+}
+
+Status RowsetMetaManager::try_get_partial_update_info(OlapMeta* meta, int64_t 
tablet_id,
+                                                      int64_t partition_id, 
int64_t txn_id,
+                                                      PartialUpdateInfoPB* 
partial_update_info_pb) {
+    std::string key =
+            fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, 
partition_id, txn_id);
+    std::string value;
+    Status status = meta->get(META_COLUMN_FAMILY_INDEX, key, &value);
+    if (status.is<META_KEY_NOT_FOUND>()) {
+        return status;
+    }
+    if (!status.ok()) {
+        LOG_WARNING("failed to get partial update info. tablet_id={}, 
partition_id={}, txn_id={}",
+                    tablet_id, partition_id, txn_id);
+        return status;
+    }
+    if (!partial_update_info_pb->ParseFromString(value)) {
+        return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>(
+                "fail to parse partial update info content to protobuf object. 
tablet_id={}, "
+                "partition_id={}, txn_id={}",
+                tablet_id, partition_id, txn_id);
+    }
+    return Status::OK();
+}
+
+Status RowsetMetaManager::traverse_partial_update_info(
+        OlapMeta* meta,
+        std::function<bool(int64_t, int64_t, int64_t, std::string_view)> 
const& func) {
+    auto traverse_partial_update_info_func = [&func](const std::string& key,
+                                                     const std::string& value) 
-> bool {
+        std::vector<std::string> parts;
+        // key format: pui_{tablet_id}_{partition_id}_{txn_id}
+        RETURN_IF_ERROR(split_string(key, '_', &parts));
+        if (parts.size() != 4) {
+            LOG_WARNING("invalid rowset key={}, splitted size={}", key, 
parts.size());
+            return true;
+        }
+        int64_t tablet_id = std::stoll(parts[1]);
+        int64_t partition_id = std::stoll(parts[2]);
+        int64_t txn_id = std::stoll(parts[3]);
+        return func(tablet_id, partition_id, txn_id, value);
+    };
+    return meta->iterate(META_COLUMN_FAMILY_INDEX, PARTIAL_UPDATE_INFO_PREFIX,
+                         traverse_partial_update_info_func);
+}
+
+Status RowsetMetaManager::remove_partial_update_info(OlapMeta* meta, int64_t 
tablet_id,
+                                                     int64_t partition_id, 
int64_t txn_id) {
+    std::string key =
+            fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id, 
partition_id, txn_id);
+    Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key);
+    VLOG_NOTICE << "remove partial update info, key=" << key;
+    return res;
+}
+
+Status RowsetMetaManager::remove_partial_update_infos(
+        OlapMeta* meta, const std::vector<std::tuple<int64_t, int64_t, 
int64_t>>& keys) {
+    std::vector<std::string> remove_keys;
+    for (auto [tablet_id, partition_id, txn_id] : keys) {
+        remove_keys.push_back(fmt::format("{}{}_{}_{}", 
PARTIAL_UPDATE_INFO_PREFIX, tablet_id,
+                                          partition_id, txn_id));
+    }
+    Status res = meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
+    VLOG_NOTICE << "remove partial update info, remove_keys.size()=" << 
remove_keys.size();
+    return res;
+}
+
+Status RowsetMetaManager::remove_tablet_related_partial_update_info(OlapMeta* 
meta,
+                                                                    int64_t 
tablet_id) {
+    std::string prefix = fmt::format("{}{}", PARTIAL_UPDATE_INFO_PREFIX, 
tablet_id);
+    std::vector<std::string> remove_keys;
+    auto get_remove_keys_func = [&](const std::string& key, const std::string& 
value) -> bool {
+        remove_keys.emplace_back(key);
+        return true;
+    };
+    VLOG_NOTICE << "remove tablet related partial update info, tablet_id: " << 
tablet_id
+                << " removed keys size: " << remove_keys.size();
+    RETURN_IF_ERROR(meta->iterate(META_COLUMN_FAMILY_INDEX, prefix, 
get_remove_keys_func));
+    return meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
+}
+
 } // namespace doris
diff --git a/be/src/olap/rowset/rowset_meta_manager.h 
b/be/src/olap/rowset/rowset_meta_manager.h
index ddf33aa055a..d8cf9c37152 100644
--- a/be/src/olap/rowset/rowset_meta_manager.h
+++ b/be/src/olap/rowset/rowset_meta_manager.h
@@ -18,6 +18,8 @@
 #ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_MANAGER_H
 #define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_MANAGER_H
 
+#include <gen_cpp/olap_file.pb.h>
+
 #include <cstdint>
 #include <functional>
 #include <string>
@@ -32,6 +34,7 @@
 namespace doris {
 class OlapMeta;
 class RowsetMetaPB;
+class PartialUpdateInfoPB;
 } // namespace doris
 
 namespace doris {
@@ -76,6 +79,21 @@ public:
 
     static Status load_json_rowset_meta(OlapMeta* meta, const std::string& 
rowset_meta_path);
 
+    static Status save_partial_update_info(OlapMeta* meta, int64_t tablet_id, 
int64_t partition_id,
+                                           int64_t txn_id,
+                                           const PartialUpdateInfoPB& 
partial_update_info_pb);
+    static Status try_get_partial_update_info(OlapMeta* meta, int64_t 
tablet_id,
+                                              int64_t partition_id, int64_t 
txn_id,
+                                              PartialUpdateInfoPB* 
partial_update_info_pb);
+    static Status traverse_partial_update_info(
+            OlapMeta* meta,
+            std::function<bool(int64_t, int64_t, int64_t, std::string_view)> 
const& func);
+    static Status remove_partial_update_info(OlapMeta* meta, int64_t tablet_id,
+                                             int64_t partition_id, int64_t 
txn_id);
+    static Status remove_partial_update_infos(
+            OlapMeta* meta, const std::vector<std::tuple<int64_t, int64_t, 
int64_t>>& keys);
+    static Status remove_tablet_related_partial_update_info(OlapMeta* meta, 
int64_t tablet_id);
+
 private:
     static Status _save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& 
rowset_id,
                         const RowsetMetaPB& rowset_meta_pb);
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 253e0389f56..5a85c45c524 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -815,6 +815,9 @@ Status StorageEngine::start_trash_sweep(double* usage, bool 
ignore_guard) {
     // cleand unused pending publish info for deleted tablet
     _clean_unused_pending_publish_info();
 
+    // clean unused partial update info for finished txns
+    _clean_unused_partial_update_info();
+
     // clean unused rowsets in remote storage backends
     for (auto data_dir : get_stores()) {
         data_dir->perform_remote_rowset_gc();
@@ -973,6 +976,34 @@ void StorageEngine::_clean_unused_pending_publish_info() {
     }
 }
 
+void StorageEngine::_clean_unused_partial_update_info() {
+    std::vector<std::tuple<int64_t, int64_t, int64_t>> remove_infos;
+    auto unused_partial_update_info_collector =
+            [this, &remove_infos](int64_t tablet_id, int64_t partition_id, 
int64_t txn_id,
+                                  std::string_view value) -> bool {
+        TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
+        if (tablet == nullptr) {
+            remove_infos.emplace_back(tablet_id, partition_id, txn_id);
+            return true;
+        }
+        TxnState txn_state = _txn_manager->get_txn_state(
+                partition_id, txn_id, tablet_id, tablet->schema_hash(), 
tablet->tablet_uid());
+        if (txn_state == TxnState::NOT_FOUND || txn_state == TxnState::ABORTED 
||
+            txn_state == TxnState::DELETED) {
+            remove_infos.emplace_back(tablet_id, partition_id, txn_id);
+            return true;
+        }
+        return true;
+    };
+    auto data_dirs = get_stores();
+    for (auto* data_dir : data_dirs) {
+        static_cast<void>(RowsetMetaManager::traverse_partial_update_info(
+                data_dir->get_meta(), unused_partial_update_info_collector));
+        static_cast<void>(
+                
RowsetMetaManager::remove_partial_update_infos(data_dir->get_meta(), 
remove_infos));
+    }
+}
+
 void StorageEngine::gc_binlogs(const std::unordered_map<int64_t, int64_t>& 
gc_tablet_infos) {
     for (auto [tablet_id, version] : gc_tablet_infos) {
         LOG(INFO) << fmt::format("start to gc binlogs for tablet_id: {}, 
version: {}", tablet_id,
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 94478097c7b..3dc59c19bed 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -264,6 +264,8 @@ private:
 
     void _clean_unused_pending_publish_info();
 
+    void _clean_unused_partial_update_info();
+
     Status _do_sweep(const std::string& scan_root, const time_t& local_tm_now,
                      const int32_t expire);
 
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 1926738bafe..4d06ab36e97 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -33,8 +33,11 @@
 
 #include "common/config.h"
 #include "common/logging.h"
+#include "common/status.h"
 #include "olap/data_dir.h"
 #include "olap/delta_writer.h"
+#include "olap/olap_common.h"
+#include "olap/partial_update_info.h"
 #include "olap/rowset/rowset_meta.h"
 #include "olap/rowset/rowset_meta_manager.h"
 #include "olap/schema_change.h"
@@ -172,10 +175,11 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, 
TTransactionId transac
 
 Status TxnManager::commit_txn(TPartitionId partition_id, const 
TabletSharedPtr& tablet,
                               TTransactionId transaction_id, const PUniqueId& 
load_id,
-                              const RowsetSharedPtr& rowset_ptr, bool 
is_recovery) {
+                              const RowsetSharedPtr& rowset_ptr, bool 
is_recovery,
+                              std::shared_ptr<PartialUpdateInfo> 
partial_update_info) {
     return commit_txn(tablet->data_dir()->get_meta(), partition_id, 
transaction_id,
                       tablet->tablet_id(), tablet->schema_hash(), 
tablet->tablet_uid(), load_id,
-                      rowset_ptr, is_recovery);
+                      rowset_ptr, is_recovery, partial_update_info);
 }
 
 Status TxnManager::publish_txn(TPartitionId partition_id, const 
TabletSharedPtr& tablet,
@@ -260,7 +264,8 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId 
partition_id,
                               TTransactionId transaction_id, TTabletId 
tablet_id,
                               SchemaHash schema_hash, TabletUid tablet_uid,
                               const PUniqueId& load_id, const RowsetSharedPtr& 
rowset_ptr,
-                              bool is_recovery) {
+                              bool is_recovery,
+                              std::shared_ptr<PartialUpdateInfo> 
partial_update_info) {
     if (partition_id < 1 || transaction_id < 1 || tablet_id < 1) {
         LOG(WARNING) << "invalid commit req "
                      << " partition_id=" << partition_id << " transaction_id=" 
<< transaction_id
@@ -370,17 +375,54 @@ Status TxnManager::commit_txn(OlapMeta* meta, 
TPartitionId partition_id,
                     "txn id: {}",
                     rowset_ptr->rowset_id().to_string(), tablet_id, 
transaction_id);
         }
+
+        if (partial_update_info && partial_update_info->is_partial_update) {
+            PartialUpdateInfoPB partial_update_info_pb;
+            partial_update_info->to_pb(&partial_update_info_pb);
+            save_status = RowsetMetaManager::save_partial_update_info(
+                    meta, tablet_id, partition_id, transaction_id, 
partial_update_info_pb);
+            if (!save_status.ok()) {
+                save_status.append(fmt::format(", txn_id: {}", 
transaction_id));
+                return save_status;
+            }
+        }
+    }
+
+    TabletSharedPtr tablet;
+    std::shared_ptr<PartialUpdateInfo> decoded_partial_update_info {nullptr};
+    if (is_recovery) {
+        tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_info.tablet_id,
+                                                                         
tablet_info.tablet_uid);
+        if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
+            PartialUpdateInfoPB partial_update_info_pb;
+            auto st = RowsetMetaManager::try_get_partial_update_info(
+                    meta, tablet_id, partition_id, transaction_id, 
&partial_update_info_pb);
+            if (st.ok()) {
+                decoded_partial_update_info = 
std::make_shared<PartialUpdateInfo>();
+                decoded_partial_update_info->from_pb(&partial_update_info_pb);
+                DCHECK(decoded_partial_update_info->is_partial_update);
+            } else if (!st.is<META_KEY_NOT_FOUND>()) {
+                // the load is not a partial update
+                return st;
+            }
+        }
     }
 
     {
         std::lock_guard<std::shared_mutex> 
wrlock(_get_txn_map_lock(transaction_id));
         TabletTxnInfo load_info(load_id, rowset_ptr);
         if (is_recovery) {
-            TabletSharedPtr tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
-                    tablet_info.tablet_id, tablet_info.tablet_uid);
             if (tablet != nullptr && 
tablet->enable_unique_key_merge_on_write()) {
                 load_info.unique_key_merge_on_write = true;
                 load_info.delete_bitmap.reset(new 
DeleteBitmap(tablet->tablet_id()));
+                if (decoded_partial_update_info) {
+                    LOG_INFO(
+                            "get partial update info from RocksDB during 
recovery. txn_id={}, "
+                            "partition_id={}, tablet_id={}, 
partial_update_info=[{}]",
+                            transaction_id, partition_id, tablet_id,
+                            decoded_partial_update_info->summary());
+                    load_info.partial_update_info = 
decoded_partial_update_info;
+                }
             }
         }
         load_info.commit();
@@ -515,6 +557,20 @@ Status TxnManager::publish_txn(OlapMeta* meta, 
TPartitionId partition_id,
                 rowset->rowset_id().to_string(), tablet_id, transaction_id);
     }
 
+    if (tablet_txn_info.unique_key_merge_on_write && 
tablet_txn_info.partial_update_info &&
+        tablet_txn_info.partial_update_info->is_partial_update) {
+        status = RowsetMetaManager::remove_partial_update_info(meta, 
tablet_id, partition_id,
+                                                               transaction_id);
+        if (!status) {
+            // discard the error status and print the warning log
+            LOG_WARNING(
+                    "fail to remove partial update info from RocksDB. 
txn_id={}, rowset_id={}, "
+                    "tablet_id={}, tablet_uid={}",
+                    transaction_id, rowset->rowset_id().to_string(), tablet_id,
+                    tablet_uid.to_string());
+        }
+    }
+
     // TODO(Drogon): remove these test codes
     if (enable_binlog) {
         auto version_str = fmt::format("{}", version.first);
@@ -696,6 +752,13 @@ void 
TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId ta
             }
         }
     }
+    if (meta != nullptr) {
+        Status st = 
RowsetMetaManager::remove_tablet_related_partial_update_info(meta, tablet_id);
+        if (!st.ok()) {
+            LOG_WARNING("failed to partial update info, tablet_id={}, err={}", 
tablet_id,
+                        st.to_string());
+        }
+    }
 }
 
 void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id,
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 0a8d0ee3026..c49441459bf 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -50,6 +50,7 @@ namespace doris {
 class DeltaWriter;
 class OlapMeta;
 struct TabletPublishStatistics;
+struct PartialUpdateInfo;
 
 enum class TxnState {
     NOT_FOUND = 0,
@@ -144,7 +145,8 @@ public:
 
     Status commit_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
                       TTransactionId transaction_id, const PUniqueId& load_id,
-                      const RowsetSharedPtr& rowset_ptr, bool is_recovery);
+                      const RowsetSharedPtr& rowset_ptr, bool is_recovery,
+                      std::shared_ptr<PartialUpdateInfo> partial_update_info = 
nullptr);
 
     Status publish_txn(TPartitionId partition_id, const TabletSharedPtr& 
tablet,
                        TTransactionId transaction_id, const Version& version,
@@ -159,8 +161,8 @@ public:
 
     Status commit_txn(OlapMeta* meta, TPartitionId partition_id, 
TTransactionId transaction_id,
                       TTabletId tablet_id, SchemaHash schema_hash, TabletUid 
tablet_uid,
-                      const PUniqueId& load_id, const RowsetSharedPtr& 
rowset_ptr,
-                      bool is_recovery);
+                      const PUniqueId& load_id, const RowsetSharedPtr& 
rowset_ptr, bool is_recovery,
+                      std::shared_ptr<PartialUpdateInfo> partial_update_info = 
nullptr);
 
     // remove a txn from txn manager
     // not persist rowset meta because
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index c633248f21a..ba897ea21b2 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -359,3 +359,18 @@ message RowsetBinlogMetasPB {
 
     repeated RowsetBinlogMetaPB rowset_binlog_metas = 1;
 }
+
+message PartialUpdateInfoPB {
+    optional bool is_partial_update = 1 [default = false];
+    repeated string partial_update_input_columns = 2;
+    repeated uint32 missing_cids = 3;
+    repeated uint32 update_cids = 4;
+    optional bool can_insert_new_rows_in_partial_update = 5 [default = false];
+    optional bool is_strict_mode = 6 [default = false];
+    optional int64 timestamp_ms = 7 [default = 0];
+    optional string timezone = 8;
+    optional bool is_input_columns_contains_auto_inc_column = 9 [default = 
false];
+    optional bool is_schema_contains_auto_inc_column = 10 [default = false];
+    repeated string default_values = 11;
+    optional int64 max_version_in_flush_phase = 12 [default = -1];
+}
diff --git a/regression-test/data/unique_with_mow_p0/partial_update/data1.csv 
b/regression-test/data/unique_with_mow_p0/partial_update/data1.csv
new file mode 100644
index 00000000000..be9f2feb69a
--- /dev/null
+++ b/regression-test/data/unique_with_mow_p0/partial_update/data1.csv
@@ -0,0 +1,2 @@
+1,10,10
+3,30,30
\ No newline at end of file
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.out
 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.out
new file mode 100644
index 00000000000..6444b41c2c2
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.out
@@ -0,0 +1,21 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1      1       1       1       1
+2      2       2       2       2
+3      3       3       3       3
+
+-- !sql --
+1      1       1       1       1
+2      2       2       2       2
+3      3       3       3       3
+
+-- !sql --
+1      1       1       99      99
+2      2       2       88      88
+3      3       3       77      77
+
+-- !sql --
+1      10      10      99      99
+2      2       2       88      88
+3      30      30      77      77
+
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.groovy
new file mode 100644
index 00000000000..bc2a44425b3
--- /dev/null
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.groovy
@@ -0,0 +1,156 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.Date
+import java.text.SimpleDateFormat
+import org.apache.http.HttpResponse
+import org.apache.http.client.methods.HttpPut
+import org.apache.http.impl.client.CloseableHttpClient
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.entity.ContentType
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.config.RequestConfig
+import org.apache.http.client.RedirectStrategy
+import org.apache.http.protocol.HttpContext
+import org.apache.http.HttpRequest
+import org.apache.http.impl.client.LaxRedirectStrategy
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.util.EntityUtils
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_partial_update_conflict_be_restart") {
+    def dbName = context.config.getDbNameByFile(context.file)
+
+    def options = new ClusterOptions()
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.cloudMode = false
+    docker(options) {
+        def table1 = "test_partial_update_conflict_be_restart"
+        sql "DROP TABLE IF EXISTS ${table1};"
+        sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+                `k1` int NOT NULL,
+                `c1` int,
+                `c2` int,
+                `c3` int,
+                `c4` int
+                )UNIQUE KEY(k1)
+            DISTRIBUTED BY HASH(k1) BUCKETS 1
+            PROPERTIES (
+                "disable_auto_compaction" = "true",
+                "replication_num" = "1"); """
+
+        sql "insert into ${table1} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3);"
+        order_qt_sql "select * from ${table1};"
+
+        def do_streamload_2pc_commit = { txnId ->
+            def feNode = sql_return_maparray("show frontends;").get(0)
+            def command = "curl -X PUT --location-trusted -u root:" +
+                    " -H txn_id:${txnId}" +
+                    " -H txn_operation:commit" +
+                    " 
http://${feNode.Host}:${feNode.HttpPort}/api/${dbName}/${table1}/_stream_load_2pc";
+            log.info("http_stream execute 2pc: ${command}")
+
+            def process = command.execute()
+            code = process.waitFor()
+            out = process.text
+            json2pc = parseJson(out)
+            log.info("http_stream 2pc result: ${out}".toString())
+            assertEquals(code, 0)
+            assertEquals("success", json2pc.status.toLowerCase())
+        }
+
+        def wait_for_publish = {txnId, waitSecond ->
+            String st = "PREPARE"
+            while (!st.equalsIgnoreCase("VISIBLE") && 
!st.equalsIgnoreCase("ABORTED") && waitSecond > 0) {
+                Thread.sleep(1000)
+                waitSecond -= 1
+                def result = sql_return_maparray "show transaction from 
${dbName} where id = ${txnId}"
+                assertNotNull(result)
+                st = result[0].TransactionStatus
+            }
+            log.info("Stream load with txn ${txnId} is ${st}")
+            assertEquals(st, "VISIBLE")
+        }
+
+        String txnId1
+        streamLoad {
+            table "${table1}"
+            set 'column_separator', ','
+            set 'format', 'csv'
+            set 'partial_columns', 'true'
+            set 'columns', 'k1,c1,c2'
+            set 'strict_mode', "false"
+            set 'two_phase_commit', 'true'
+            file 'data1.csv'
+            time 10000 // limit inflight 10s
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                txnId1 = json.TxnId
+                assertEquals("success", json.Status.toLowerCase())
+            }
+        }
+        sql "sync;"
+        order_qt_sql "select * from ${table1};"
+
+        // another partial update that conflicts with the previous load and 
publishes successfully
+        sql "set enable_unique_key_partial_update=true;"
+        sql "sync;"
+        sql "insert into ${table1}(k1,c3,c4) values(1, 99, 
99),(2,88,88),(3,77,77);"
+        sql "set enable_unique_key_partial_update=false;"
+        sql "sync;"
+        order_qt_sql "select * from ${table1};"
+
+        // restart backend
+        cluster.restartBackends()
+        Thread.sleep(5000)
+
+        // wait for be restart
+        boolean ok = false
+        int cnt = 0
+        for (; cnt < 10; cnt++) {
+            def be = sql_return_maparray("show backends").get(0)
+            if (be.Alive.toBoolean()) {
+                ok = true
+                break;
+            }
+            logger.info("wait for BE restart...")
+            Thread.sleep(1000)
+        }
+        if (!ok) {
+            logger.info("BE failed to restart")
+            assertTrue(false)
+        }
+
+        Thread.sleep(5000)
+
+        do_streamload_2pc_commit(txnId1)
+        wait_for_publish(txnId1, 10)
+
+
+        sql "sync;"
+        order_qt_sql "select * from ${table1};"
+        sql "DROP TABLE IF EXISTS ${table1};"
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to