This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f31c1d858a2 [fix](merge-on-write) fix duplicate key in schema change 
(#25705)
f31c1d858a2 is described below

commit f31c1d858a2e4efdceda91228084b81fcab84439
Author: Xin Liao <[email protected]>
AuthorDate: Wed Oct 25 18:59:48 2023 +0800

    [fix](merge-on-write) fix duplicate key in schema change (#25705)
    
    It should be ensured that the obtained versions are continuous when 
calculate delete bitmap calculations in publish.
    The remaining NOTREADY tablet in the schema change failure should be 
dropped.
    When a rowset was deleted, the delete bitmap cannot be deleted until there 
are no read requests to use the rowset.
---
 be/src/olap/full_compaction.cpp                  |  3 +--
 be/src/olap/rowset_builder.cpp                   | 16 +++---------
 be/src/olap/schema_change.cpp                    | 13 +++-------
 be/src/olap/storage_engine.cpp                   |  7 ++++++
 be/src/olap/tablet.cpp                           | 32 ++++++++++++++----------
 be/src/olap/tablet.h                             |  2 +-
 be/src/olap/tablet_meta.cpp                      |  5 ----
 be/src/olap/task/engine_clone_task.cpp           | 10 ++++++++
 be/src/olap/task/engine_publish_version_task.cpp | 32 ++++++++++++++++--------
 9 files changed, 66 insertions(+), 54 deletions(-)

diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index 393bbd8c579..246056794c9 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -147,8 +147,7 @@ Status 
FullCompaction::_full_compaction_update_delete_bitmap(const RowsetSharedP
     std::vector<RowsetSharedPtr> tmp_rowsets {};
 
     // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-    if (_tablet->tablet_state() == TABLET_NOTREADY &&
-        SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+    if (_tablet->tablet_state() == TABLET_NOTREADY) {
         LOG(INFO) << "tablet is under alter process, update delete bitmap 
later, tablet_id="
                   << _tablet->tablet_id();
         return Status::OK();
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 85b72683007..afe7fd385ff 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -112,8 +112,7 @@ Status 
RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context)
     std::lock_guard<std::shared_mutex> lck(tablet->get_header_lock());
     int64_t cur_max_version = tablet->max_version_unlocked().second;
     // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-    if (tablet->tablet_state() == TABLET_NOTREADY &&
-        SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+    if (tablet->tablet_state() == TABLET_NOTREADY) {
         // Disable 'partial_update' when the tablet is undergoing a 'schema 
changing process'
         if (_req.table_schema_param->is_partial_update()) {
             return Status::InternalError(
@@ -122,7 +121,7 @@ Status 
RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context)
         }
         _rowset_ids.clear();
     } else {
-        _rowset_ids = tablet->all_rs_id(cur_max_version);
+        RETURN_IF_ERROR(tablet->all_rs_id(cur_max_version, &_rowset_ids));
     }
     _delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
     mow_context =
@@ -238,8 +237,7 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
     std::lock_guard<std::mutex> l(_lock);
     SCOPED_TIMER(_submit_delete_bitmap_timer);
     // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-    if (tablet->tablet_state() == TABLET_NOTREADY &&
-        SchemaChangeHandler::tablet_in_converting(tablet->tablet_id())) {
+    if (tablet->tablet_state() == TABLET_NOTREADY) {
         LOG(INFO) << "tablet is under alter process, delete bitmap will be 
calculated later, "
                      "tablet_id: "
                   << tablet->tablet_id() << " txn_id: " << _req.txn_id;
@@ -248,11 +246,6 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
     auto beta_rowset = reinterpret_cast<BetaRowset*>(_rowset.get());
     std::vector<segment_v2::SegmentSharedPtr> segments;
     RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
-    // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-    if (tablet->tablet_state() == TABLET_NOTREADY &&
-        SchemaChangeHandler::tablet_in_converting(tablet->tablet_id())) {
-        return Status::OK();
-    }
     if (segments.size() > 1) {
         // calculate delete bitmap between segments
         RETURN_IF_ERROR(
@@ -293,8 +286,7 @@ Status RowsetBuilder::commit_txn() {
     auto tablet = static_cast<Tablet*>(_tablet.get());
     if (tablet->enable_unique_key_merge_on_write() &&
         config::enable_merge_on_write_correctness_check && _rowset->num_rows() 
!= 0 &&
-        !(tablet->tablet_state() == TABLET_NOTREADY &&
-          SchemaChangeHandler::tablet_in_converting(tablet->tablet_id()))) {
+        tablet->tablet_state() != TABLET_NOTREADY) {
         auto st = tablet->check_delete_bitmap_correctness(
                 _delete_bitmap, _rowset->end_version() - 1, _req.txn_id, 
_rowset_ids);
         if (!st.ok()) {
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 2e732b27ac2..e331564d9d2 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -912,12 +912,9 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
             _tablet_ids_in_converting.insert(new_tablet->tablet_id());
         }
         res = _convert_historical_rowsets(sc_params);
-        if (new_tablet->keys_type() != UNIQUE_KEYS ||
-            !new_tablet->enable_unique_key_merge_on_write() || !res) {
-            {
-                std::lock_guard<std::shared_mutex> wrlock(_mutex);
-                _tablet_ids_in_converting.erase(new_tablet->tablet_id());
-            }
+        {
+            std::lock_guard<std::shared_mutex> wrlock(_mutex);
+            _tablet_ids_in_converting.erase(new_tablet->tablet_id());
         }
         if (!res) {
             break;
@@ -977,10 +974,6 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
             }
 
             // step 4
-            {
-                std::lock_guard<std::shared_mutex> wrlock(_mutex);
-                _tablet_ids_in_converting.erase(new_tablet->tablet_id());
-            }
             res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
             if (!res) {
                 break;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index f17a6de8414..3bdc6967af1 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -1081,6 +1081,13 @@ void StorageEngine::start_delete_unused_rowset() {
         VLOG_NOTICE << "start to remove rowset:" << it->second->rowset_id()
                     << ", version:" << it->second->version().first << "-"
                     << it->second->version().second;
+        auto tablet_id = it->second->rowset_meta()->tablet_id();
+        auto tablet = _tablet_manager->get_tablet(tablet_id);
+        // delete delete_bitmap of unused rowsets
+        if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
+            
tablet->tablet_meta()->delete_bitmap().remove({it->second->rowset_id(), 0, 0},
+                                                          
{it->second->rowset_id(), UINT32_MAX, 0});
+        }
         Status status = it->second->remove();
         VLOG_NOTICE << "remove rowset:" << it->second->rowset_id()
                     << " finished. status:" << status;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 937a7d20533..2dbcd46751e 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3300,7 +3300,8 @@ Status Tablet::update_delete_bitmap_without_lock(const 
RowsetSharedPtr& rowset)
                   << tablet_id() << " cur max_version: " << cur_version;
         return Status::OK();
     }
-    RowsetIdUnorderedSet cur_rowset_ids = all_rs_id(cur_version - 1);
+    RowsetIdUnorderedSet cur_rowset_ids;
+    RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids));
     DeleteBitmapPtr delete_bitmap = 
std::make_shared<DeleteBitmap>(tablet_id());
     RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset, segments, 
delete_bitmap));
 
@@ -3349,7 +3350,7 @@ Status Tablet::commit_phase_update_delete_bitmap(
     {
         std::shared_lock meta_rlock(_meta_lock);
         cur_version = max_version_unlocked().second;
-        cur_rowset_ids = all_rs_id(cur_version);
+        RETURN_IF_ERROR(all_rs_id(cur_version, &cur_rowset_ids));
         _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, 
&rowset_ids_to_add,
                                &rowset_ids_to_del);
         specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add);
@@ -3390,13 +3391,12 @@ Status Tablet::update_delete_bitmap(const 
RowsetSharedPtr& rowset,
     {
         std::shared_lock meta_rlock(_meta_lock);
         // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-        if (tablet_state() == TABLET_NOTREADY &&
-            SchemaChangeHandler::tablet_in_converting(tablet_id())) {
+        if (tablet_state() == TABLET_NOTREADY) {
             LOG(INFO) << "tablet is under alter process, update delete bitmap 
later, tablet_id="
                       << tablet_id();
             return Status::OK();
         }
-        cur_rowset_ids = all_rs_id(cur_version - 1);
+        RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids));
     }
     auto t2 = watch.get_elapse_time_us();
 
@@ -3432,7 +3432,7 @@ Status Tablet::update_delete_bitmap(const 
RowsetSharedPtr& rowset,
     LOG(INFO) << "[Publish] construct delete bitmap tablet: " << tablet_id()
               << ", rowset_ids to add: " << rowset_ids_to_add.size()
               << ", rowset_ids to del: " << rowset_ids_to_del.size()
-              << ", cur max_version: " << cur_version << ", transaction_id: " 
<< txn_id << ","
+              << ", cur version: " << cur_version << ", transaction_id: " << 
txn_id << ","
               << ss.str() << " , total rows: " << total_rows;
 
     if (config::enable_merge_on_write_correctness_check && rowset->num_rows() 
!= 0) {
@@ -3564,18 +3564,24 @@ Status Tablet::check_rowid_conversion(
     return Status::OK();
 }
 
-RowsetIdUnorderedSet Tablet::all_rs_id(int64_t max_version) const {
-    RowsetIdUnorderedSet rowset_ids;
-    for (const auto& rs_it : _rs_version_map) {
-        if (rs_it.first.second == 1) {
+Status Tablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet* 
rowset_ids) const {
+    //  Ensure that the obtained versions of rowsets are continuous
+    std::vector<Version> version_path;
+    RETURN_IF_ERROR(capture_consistent_versions(Version(0, max_version), 
&version_path));
+    for (auto& ver : version_path) {
+        if (ver.second == 1) {
             // [0-1] rowset is empty for each tablet, skip it
             continue;
         }
-        if (rs_it.first.second <= max_version) {
-            rowset_ids.insert(rs_it.second->rowset_id());
+        auto it = _rs_version_map.find(ver);
+        if (it == _rs_version_map.end()) {
+            return Status::Error<CAPTURE_ROWSET_ERROR, false>(
+                    "fail to find Rowset for version. tablet={}, version={}", 
tablet_id(),
+                    ver.to_string());
         }
+        rowset_ids->emplace(it->second->rowset_id());
     }
-    return rowset_ids;
+    return Status::OK();
 }
 
 bool Tablet::check_all_rowset_segment() {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 4a074952ab9..58a2c7f8f6c 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -502,7 +502,7 @@ public:
             RowsetSharedPtr dst_rowset,
             const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, 
RowLocation>>>&
                     location_map);
-    RowsetIdUnorderedSet all_rs_id(int64_t max_version) const;
+    Status all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) 
const;
     void sort_block(vectorized::Block& in_block, vectorized::Block& 
output_block);
 
     bool check_all_rowset_segment();
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 201ffd0a2d2..b5f3b4fd36d 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -747,11 +747,6 @@ void TabletMeta::modify_rs_metas(const 
std::vector<RowsetMetaSharedPtr>& to_add,
                 ++it;
             }
         }
-        // delete delete_bitmap of to_delete's rowsets if not added to 
_stale_rs_metas.
-        if (same_version && _enable_unique_key_merge_on_write) {
-            delete_bitmap().remove({rs_to_del->rowset_id(), 0, 0},
-                                   {rs_to_del->rowset_id(), UINT32_MAX, 0});
-        }
     }
     if (!same_version) {
         // put to_delete rowsets in _stale_rs_metas.
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index dddec4daba8..6edc746da3a 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -116,6 +116,16 @@ Status EngineCloneTask::_do_clone() {
     // Check local tablet exist or not
     TabletSharedPtr tablet =
             
StorageEngine::instance()->tablet_manager()->get_tablet(_clone_req.tablet_id);
+
+    // The status of a tablet is not ready, indicating that it is a residual 
tablet after a schema
+    // change failure. It should not provide normal read and write, so drop it 
here.
+    if (tablet && tablet->tablet_state() == TABLET_NOTREADY) {
+        LOG(WARNING) << "tablet state is not ready when clone, need to drop 
old tablet, tablet_id="
+                     << tablet->tablet_id();
+        
RETURN_IF_ERROR(StorageEngine::instance()->tablet_manager()->drop_tablet(
+                tablet->tablet_id(), tablet->replica_id(), false));
+        tablet.reset();
+    }
     bool is_new_tablet = tablet == nullptr;
     // try to incremental clone
     std::vector<Version> missed_versions;
diff --git a/be/src/olap/task/engine_publish_version_task.cpp 
b/be/src/olap/task/engine_publish_version_task.cpp
index c5a8c32b84c..f9e013667bc 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -152,26 +152,24 @@ Status EnginePublishVersionTask::finish() {
                     
StorageEngine::instance()->txn_manager()->update_tablet_version_txn(
                             tablet_info.tablet_id, version.second, 
transaction_id);
                 }
-                Version max_version;
+                int64_t max_version;
                 TabletState tablet_state;
                 {
                     std::shared_lock rdlock(tablet->get_header_lock());
-                    max_version = tablet->max_version_unlocked();
+                    max_version = tablet->max_version_unlocked().second;
                     tablet_state = tablet->tablet_state();
                 }
-                if (tablet_state == TabletState::TABLET_RUNNING &&
-                    version.first != max_version.second + 1) {
-                    // If a tablet migrates out and back, the previously failed
-                    // publish task may retry on the new tablet, so check
-                    // whether the version exists. if not exist, then set
-                    // publish failed
-                    if (!tablet->check_version_exist(version)) {
+                if (version.first != max_version + 1) {
+                    if (tablet->check_version_exist(version)) {
+                        continue;
+                    }
+                    auto handle_version_not_continuous = [&]() {
                         add_error_tablet_id(tablet_info.tablet_id);
                         _discontinuous_version_tablets->emplace_back(
                                 partition_id, tablet_info.tablet_id, 
version.first);
                         res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>(
                                 "check_version_exist failed");
-                        int64_t missed_version = max_version.second + 1;
+                        int64_t missed_version = max_version + 1;
                         int64_t missed_txn_id =
                                 
StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version(
                                         tablet->tablet_id(), missed_version);
@@ -186,8 +184,20 @@ Status EnginePublishVersionTask::finish() {
                         } else {
                             LOG_EVERY_SECOND(INFO) << msg;
                         }
+                    };
+                    // The versions during the schema change period need to be 
also continuous
+                    if (tablet_state == TabletState::TABLET_NOTREADY) {
+                        Version max_continuous_version = {-1, 0};
+                        
tablet->max_continuous_version_from_beginning(&max_continuous_version);
+                        if (max_version > 1 && version.first > max_version &&
+                            max_continuous_version.second != max_version) {
+                            handle_version_not_continuous();
+                            continue;
+                        }
+                    } else {
+                        handle_version_not_continuous();
+                        continue;
                     }
-                    continue;
                 }
             }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to