This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 6f8ea15a729 [fix](merge-on-write) fix duplicate key in schema change 
(#25705) (#25914)
6f8ea15a729 is described below

commit 6f8ea15a729c13c884e146298d5e9100ac535d29
Author: Xin Liao <[email protected]>
AuthorDate: Wed Oct 25 21:18:03 2023 +0800

    [fix](merge-on-write) fix duplicate key in schema change (#25705) (#25914)
---
 be/src/olap/delta_writer.cpp                     | 16 +++---------
 be/src/olap/full_compaction.cpp                  |  3 +--
 be/src/olap/schema_change.cpp                    | 13 +++-------
 be/src/olap/storage_engine.cpp                   |  7 ++++++
 be/src/olap/tablet.cpp                           | 32 ++++++++++++++----------
 be/src/olap/tablet.h                             |  2 +-
 be/src/olap/tablet_meta.cpp                      |  5 ----
 be/src/olap/task/engine_clone_task.cpp           | 10 ++++++++
 be/src/olap/task/engine_publish_version_task.cpp | 32 ++++++++++++++++--------
 9 files changed, 66 insertions(+), 54 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 87316c3cfba..3634e19c68c 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -164,8 +164,7 @@ Status DeltaWriter::init() {
         std::lock_guard<std::shared_mutex> lck(_tablet->get_header_lock());
         _cur_max_version = _tablet->max_version_unlocked().second;
         // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-        if (_tablet->tablet_state() == TABLET_NOTREADY &&
-            SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+        if (_tablet->tablet_state() == TABLET_NOTREADY) {
             // Disable 'partial_update' when the tablet is undergoing a 
'schema changing process'
             if (_req.table_schema_param->is_partial_update()) {
                 return Status::InternalError(
@@ -174,7 +173,7 @@ Status DeltaWriter::init() {
             }
             _rowset_ids.clear();
         } else {
-            _rowset_ids = _tablet->all_rs_id(_cur_max_version);
+            RETURN_IF_ERROR(_tablet->all_rs_id(_cur_max_version, 
&_rowset_ids));
         }
     }
 
@@ -459,8 +458,7 @@ Status DeltaWriter::submit_calc_delete_bitmap_task() {
 
     std::lock_guard<std::mutex> l(_lock);
     // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-    if (_tablet->tablet_state() == TABLET_NOTREADY &&
-        SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+    if (_tablet->tablet_state() == TABLET_NOTREADY) {
         LOG(INFO) << "tablet is under alter process, delete bitmap will be 
calculated later, "
                      "tablet_id: "
                   << _tablet->tablet_id() << " txn_id: " << _req.txn_id;
@@ -469,11 +467,6 @@ Status DeltaWriter::submit_calc_delete_bitmap_task() {
     auto beta_rowset = reinterpret_cast<BetaRowset*>(_cur_rowset.get());
     std::vector<segment_v2::SegmentSharedPtr> segments;
     RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
-    // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-    if (_tablet->tablet_state() == TABLET_NOTREADY &&
-        SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
-        return Status::OK();
-    }
     if (segments.size() > 1) {
         // calculate delete bitmap between segments
         
RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset, 
segments,
@@ -510,8 +503,7 @@ Status DeltaWriter::commit_txn(const PSlaveTabletNodes& 
slave_tablet_nodes,
                                const bool write_single_replica) {
     if (_tablet->enable_unique_key_merge_on_write() &&
         config::enable_merge_on_write_correctness_check && 
_cur_rowset->num_rows() != 0 &&
-        !(_tablet->tablet_state() == TABLET_NOTREADY &&
-          SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id()))) {
+        _tablet->tablet_state() != TABLET_NOTREADY) {
         auto st = _tablet->check_delete_bitmap_correctness(
                 _delete_bitmap, _cur_rowset->end_version() - 1, _req.txn_id, 
_rowset_ids);
         if (!st.ok()) {
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index 9b832ec5552..e95d6750ad3 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -147,8 +147,7 @@ Status 
FullCompaction::_full_compaction_update_delete_bitmap(const RowsetSharedP
     std::vector<RowsetSharedPtr> tmp_rowsets {};
 
     // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-    if (_tablet->tablet_state() == TABLET_NOTREADY &&
-        SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+    if (_tablet->tablet_state() == TABLET_NOTREADY) {
         LOG(INFO) << "tablet is under alter process, update delete bitmap 
later, tablet_id="
                   << _tablet->tablet_id();
         return Status::OK();
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 740255b4325..acd468a47e8 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -916,12 +916,9 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
             _tablet_ids_in_converting.insert(new_tablet->tablet_id());
         }
         res = _convert_historical_rowsets(sc_params);
-        if (new_tablet->keys_type() != UNIQUE_KEYS ||
-            !new_tablet->enable_unique_key_merge_on_write() || !res) {
-            {
-                std::lock_guard<std::shared_mutex> wrlock(_mutex);
-                _tablet_ids_in_converting.erase(new_tablet->tablet_id());
-            }
+        {
+            std::lock_guard<std::shared_mutex> wrlock(_mutex);
+            _tablet_ids_in_converting.erase(new_tablet->tablet_id());
         }
         if (!res) {
             break;
@@ -981,10 +978,6 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
             }
 
             // step 4
-            {
-                std::lock_guard<std::shared_mutex> wrlock(_mutex);
-                _tablet_ids_in_converting.erase(new_tablet->tablet_id());
-            }
             res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
             if (!res) {
                 break;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 52d5072f7d3..a4ecf33a0bb 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -1082,6 +1082,13 @@ void StorageEngine::start_delete_unused_rowset() {
         VLOG_NOTICE << "start to remove rowset:" << it->second->rowset_id()
                     << ", version:" << it->second->version().first << "-"
                     << it->second->version().second;
+        auto tablet_id = it->second->rowset_meta()->tablet_id();
+        auto tablet = _tablet_manager->get_tablet(tablet_id);
+        // delete delete_bitmap of unused rowsets
+        if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
+            
tablet->tablet_meta()->delete_bitmap().remove({it->second->rowset_id(), 0, 0},
+                                                          
{it->second->rowset_id(), UINT32_MAX, 0});
+        }
         Status status = it->second->remove();
         VLOG_NOTICE << "remove rowset:" << it->second->rowset_id()
                     << " finished. status:" << status;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 527a8b1c372..8a18f0d1d1b 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3320,7 +3320,8 @@ Status Tablet::update_delete_bitmap_without_lock(const 
RowsetSharedPtr& rowset)
                   << tablet_id() << " cur max_version: " << cur_version;
         return Status::OK();
     }
-    RowsetIdUnorderedSet cur_rowset_ids = all_rs_id(cur_version - 1);
+    RowsetIdUnorderedSet cur_rowset_ids;
+    RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids));
     DeleteBitmapPtr delete_bitmap = 
std::make_shared<DeleteBitmap>(tablet_id());
     RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset, segments, 
delete_bitmap));
 
@@ -3370,7 +3371,7 @@ Status Tablet::commit_phase_update_delete_bitmap(
     {
         std::shared_lock meta_rlock(_meta_lock);
         cur_version = max_version_unlocked().second;
-        cur_rowset_ids = all_rs_id(cur_version);
+        RETURN_IF_ERROR(all_rs_id(cur_version, &cur_rowset_ids));
         _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, 
&rowset_ids_to_add,
                                &rowset_ids_to_del);
         specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add);
@@ -3411,13 +3412,12 @@ Status Tablet::update_delete_bitmap(const 
RowsetSharedPtr& rowset,
     {
         std::shared_lock meta_rlock(_meta_lock);
         // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-        if (tablet_state() == TABLET_NOTREADY &&
-            SchemaChangeHandler::tablet_in_converting(tablet_id())) {
+        if (tablet_state() == TABLET_NOTREADY) {
             LOG(INFO) << "tablet is under alter process, update delete bitmap 
later, tablet_id="
                       << tablet_id();
             return Status::OK();
         }
-        cur_rowset_ids = all_rs_id(cur_version - 1);
+        RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids));
     }
     auto t2 = watch.get_elapse_time_us();
 
@@ -3454,7 +3454,7 @@ Status Tablet::update_delete_bitmap(const 
RowsetSharedPtr& rowset,
     LOG(INFO) << "[Publish] construct delete bitmap tablet: " << tablet_id()
               << ", rowset_ids to add: " << rowset_ids_to_add.size()
               << ", rowset_ids to del: " << rowset_ids_to_del.size()
-              << ", cur max_version: " << cur_version << ", transaction_id: " 
<< txn_id << ","
+              << ", cur version: " << cur_version << ", transaction_id: " << 
txn_id << ","
               << ss.str() << " , total rows: " << total_rows;
 
     if (config::enable_merge_on_write_correctness_check && rowset->num_rows() 
!= 0) {
@@ -3586,18 +3586,24 @@ Status Tablet::check_rowid_conversion(
     return Status::OK();
 }
 
-RowsetIdUnorderedSet Tablet::all_rs_id(int64_t max_version) const {
-    RowsetIdUnorderedSet rowset_ids;
-    for (const auto& rs_it : _rs_version_map) {
-        if (rs_it.first.second == 1) {
+Status Tablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet* 
rowset_ids) const {
+    //  Ensure that the obtained versions of rowsets are continuous
+    std::vector<Version> version_path;
+    RETURN_IF_ERROR(capture_consistent_versions(Version(0, max_version), 
&version_path));
+    for (auto& ver : version_path) {
+        if (ver.second == 1) {
             // [0-1] rowset is empty for each tablet, skip it
             continue;
         }
-        if (rs_it.first.second <= max_version) {
-            rowset_ids.insert(rs_it.second->rowset_id());
+        auto it = _rs_version_map.find(ver);
+        if (it == _rs_version_map.end()) {
+            return Status::Error<CAPTURE_ROWSET_ERROR, false>(
+                    "fail to find Rowset for version. tablet={}, version={}", 
tablet_id(),
+                    ver.to_string());
         }
+        rowset_ids->emplace(it->second->rowset_id());
     }
-    return rowset_ids;
+    return Status::OK();
 }
 
 bool Tablet::check_all_rowset_segment() {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 9e665927d31..7fbf2b9a98f 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -508,7 +508,7 @@ public:
             RowsetSharedPtr dst_rowset,
             const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, 
RowLocation>>>&
                     location_map);
-    RowsetIdUnorderedSet all_rs_id(int64_t max_version) const;
+    Status all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) 
const;
     void sort_block(vectorized::Block& in_block, vectorized::Block& 
output_block);
 
     bool check_all_rowset_segment();
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index b407e35a449..69ea6c8d4e8 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -765,11 +765,6 @@ void TabletMeta::modify_rs_metas(const 
std::vector<RowsetMetaSharedPtr>& to_add,
                 ++it;
             }
         }
-        // delete delete_bitmap of to_delete's rowsets if not added to 
_stale_rs_metas.
-        if (same_version && _enable_unique_key_merge_on_write) {
-            delete_bitmap().remove({rs_to_del->rowset_id(), 0, 0},
-                                   {rs_to_del->rowset_id(), UINT32_MAX, 0});
-        }
     }
     if (!same_version) {
         // put to_delete rowsets in _stale_rs_metas.
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index 4eb43864bec..3621a958eff 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -111,6 +111,16 @@ Status EngineCloneTask::_do_clone() {
     // Check local tablet exist or not
     TabletSharedPtr tablet =
             
StorageEngine::instance()->tablet_manager()->get_tablet(_clone_req.tablet_id);
+
+    // The status of a tablet is not ready, indicating that it is a residual 
tablet after a schema
+    // change failure. It should not provide normal read and write, so drop it 
here.
+    if (tablet && tablet->tablet_state() == TABLET_NOTREADY) {
+        LOG(WARNING) << "tablet state is not ready when clone, need to drop 
old tablet, tablet_id="
+                     << tablet->tablet_id();
+        
RETURN_IF_ERROR(StorageEngine::instance()->tablet_manager()->drop_tablet(
+                tablet->tablet_id(), tablet->replica_id(), false));
+        tablet.reset();
+    }
     bool is_new_tablet = tablet == nullptr;
     // try to incremental clone
     std::vector<Version> missed_versions;
diff --git a/be/src/olap/task/engine_publish_version_task.cpp 
b/be/src/olap/task/engine_publish_version_task.cpp
index 702c4386f11..d6642705baf 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -153,26 +153,24 @@ Status EnginePublishVersionTask::finish() {
                     
StorageEngine::instance()->txn_manager()->update_tablet_version_txn(
                             tablet_info.tablet_id, version.second, 
transaction_id);
                 }
-                Version max_version;
+                int64_t max_version;
                 TabletState tablet_state;
                 {
                     std::shared_lock rdlock(tablet->get_header_lock());
-                    max_version = tablet->max_version_unlocked();
+                    max_version = tablet->max_version_unlocked().second;
                     tablet_state = tablet->tablet_state();
                 }
-                if (tablet_state == TabletState::TABLET_RUNNING &&
-                    version.first != max_version.second + 1) {
-                    // If a tablet migrates out and back, the previously failed
-                    // publish task may retry on the new tablet, so check
-                    // whether the version exists. if not exist, then set
-                    // publish failed
-                    if (!tablet->check_version_exist(version)) {
+                if (version.first != max_version + 1) {
+                    if (tablet->check_version_exist(version)) {
+                        continue;
+                    }
+                    auto handle_version_not_continuous = [&]() {
                         add_error_tablet_id(tablet_info.tablet_id);
                         _discontinuous_version_tablets->emplace_back(
                                 partition_id, tablet_info.tablet_id, 
version.first);
                         res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>(
                                 "check_version_exist failed");
-                        int64_t missed_version = max_version.second + 1;
+                        int64_t missed_version = max_version + 1;
                         int64_t missed_txn_id =
                                 
StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version(
                                         tablet->tablet_id(), missed_version);
@@ -187,8 +185,20 @@ Status EnginePublishVersionTask::finish() {
                         } else {
                             LOG_EVERY_SECOND(INFO) << msg;
                         }
+                    };
+                    // The versions during the schema change period need to be 
also continuous
+                    if (tablet_state == TabletState::TABLET_NOTREADY) {
+                        Version max_continuous_version = {-1, 0};
+                        
tablet->max_continuous_version_from_beginning(&max_continuous_version);
+                        if (max_version > 1 && version.first > max_version &&
+                            max_continuous_version.second != max_version) {
+                            handle_version_not_continuous();
+                            continue;
+                        }
+                    } else {
+                        handle_version_not_continuous();
+                        continue;
                     }
-                    continue;
                 }
             }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to