This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 797238cbb71 [fix](merge-on-write) fix schema change may result in 
delete bitmap incorrect (#29386)
797238cbb71 is described below

commit 797238cbb71572e5d5b5a0bea778b2020dbe3e4a
Author: Xin Liao <[email protected]>
AuthorDate: Tue Jan 2 23:45:04 2024 +0800

    [fix](merge-on-write) fix schema change may result in delete bitmap 
incorrect (#29386)
---
 be/src/olap/schema_change.cpp          | 137 +++++++++++++++++++--------------
 be/src/olap/schema_change.h            |   6 +-
 be/src/olap/tablet.cpp                 |  22 ++++--
 be/src/olap/tablet.h                   |   4 +
 be/src/olap/task/engine_clone_task.cpp |   9 ++-
 5 files changed, 111 insertions(+), 67 deletions(-)

diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index d19d399b1ef..459b1931aea 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -62,6 +62,7 @@
 #include "olap/wrapper_field.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/runtime_state.h"
+#include "util/debug_points.h"
 #include "util/defer_op.h"
 #include "util/trace.h"
 #include "vec/aggregate_functions/aggregate_function.h"
@@ -711,6 +712,13 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
                   << " res=" << res;
         return res;
     }
+    new_tablet->set_alter_failed(false);
+    Defer defer([&new_tablet] {
+        // if tablet state is not TABLET_RUNNING when return, indicates that 
alter has failed.
+        if (new_tablet->tablet_state() != TABLET_RUNNING) {
+            new_tablet->set_alter_failed(true);
+        }
+    });
 
     LOG(INFO) << "finish to validate alter tablet request. begin to convert 
data from base tablet "
                  "to new tablet"
@@ -918,7 +926,8 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
             std::lock_guard<std::shared_mutex> wrlock(_mutex);
             _tablet_ids_in_converting.insert(new_tablet->tablet_id());
         }
-        res = _convert_historical_rowsets(sc_params);
+        int64_t real_alter_version = 0;
+        res = _convert_historical_rowsets(sc_params, &real_alter_version);
         {
             std::lock_guard<std::shared_mutex> wrlock(_mutex);
             _tablet_ids_in_converting.erase(new_tablet->tablet_id());
@@ -927,65 +936,12 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
             break;
         }
 
-        // For unique with merge-on-write table, should process delete bitmap 
here.
-        // 1. During double write, the newly imported rowsets does not 
calculate
-        // delete bitmap and publish successfully.
-        // 2. After conversion, calculate delete bitmap for the rowsets 
imported
-        // during double write. During this period, new data can still be 
imported
-        // witout calculating delete bitmap and publish successfully.
-        // 3. Block the new publish, calculate the delete bitmap of the
-        // incremental rowsets.
-        // 4. Switch the tablet status to TABLET_RUNNING. The newly imported
-        // data will calculate delete bitmap.
         if (new_tablet->keys_type() == UNIQUE_KEYS &&
             new_tablet->enable_unique_key_merge_on_write()) {
-            // step 2
-            int64_t max_version = new_tablet->max_version().second;
-            std::vector<RowsetSharedPtr> rowsets;
-            if (end_version < max_version) {
-                LOG(INFO)
-                        << "alter table for unique with merge-on-write, 
calculate delete bitmap of "
-                        << "double write rowsets for version: " << end_version 
+ 1 << "-"
-                        << max_version;
-                RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets(
-                        {end_version + 1, max_version}, &rowsets));
-            }
-            for (auto rowset_ptr : rowsets) {
-                if (rowset_ptr->version().second <= end_version) {
-                    continue;
-                }
-                std::lock_guard<std::mutex> 
rwlock(new_tablet->get_rowset_update_lock());
-                std::shared_lock<std::shared_mutex> 
wrlock(new_tablet->get_header_lock());
-                
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
-            }
-
-            // step 3
-            std::lock_guard<std::mutex> 
rwlock(new_tablet->get_rowset_update_lock());
-            std::lock_guard<std::shared_mutex> 
new_wlock(new_tablet->get_header_lock());
-            SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
-            int64_t new_max_version = 
new_tablet->max_version_unlocked().second;
-            rowsets.clear();
-            if (max_version < new_max_version) {
-                LOG(INFO)
-                        << "alter table for unique with merge-on-write, 
calculate delete bitmap of "
-                        << "incremental rowsets for version: " << max_version 
+ 1 << "-"
-                        << new_max_version;
-                RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets(
-                        {max_version + 1, new_max_version}, &rowsets));
-            }
-            for (auto rowset_ptr : rowsets) {
-                if (rowset_ptr->version().second <= max_version) {
-                    continue;
-                }
-                
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
-            }
-
-            // step 4
-            res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
+            res = _calc_delete_bitmap_for_mow_table(new_tablet, 
real_alter_version);
             if (!res) {
                 break;
             }
-            new_tablet->save_meta();
         } else {
             // set state to ready
             std::lock_guard<std::shared_mutex> 
new_wlock(new_tablet->get_header_lock());
@@ -1035,7 +991,10 @@ Status SchemaChangeHandler::_get_versions_to_be_changed(
     return Status::OK();
 }
 
-Status SchemaChangeHandler::_convert_historical_rowsets(const 
SchemaChangeParams& sc_params) {
+// The `real_alter_version` parameter indicates that the version of 
[0-real_alter_version] is
+// converted from a base tablet, only used for the mow table now.
+Status SchemaChangeHandler::_convert_historical_rowsets(const 
SchemaChangeParams& sc_params,
+                                                        int64_t* 
real_alter_version) {
     LOG(INFO) << "begin to convert historical rowsets for new_tablet from 
base_tablet."
               << " base_tablet=" << sc_params.base_tablet->tablet_id()
               << ", new_tablet=" << sc_params.new_tablet->tablet_id();
@@ -1146,7 +1105,7 @@ Status 
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
                          << "tablet=" << sc_params.new_tablet->tablet_id() << 
", version='"
                          << rs_reader->version().first << "-" << 
rs_reader->version().second;
             StorageEngine::instance()->add_unused_rowset(new_rowset);
-            res = Status::OK();
+            return process_alter_exit();
         } else if (!res) {
             LOG(WARNING) << "failed to register new version. "
                          << " tablet=" << sc_params.new_tablet->tablet_id()
@@ -1159,6 +1118,7 @@ Status 
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
                         << ", version=" << rs_reader->version().first << "-"
                         << rs_reader->version().second;
         }
+        *real_alter_version = rs_reader->version().second;
 
         VLOG_TRACE << "succeed to convert a history version."
                    << " version=" << rs_reader->version().first << "-"
@@ -1377,4 +1337,67 @@ Status 
SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_tablet,
     return Status::OK();
 }
 
+// For unique with merge-on-write table, should process delete bitmap here.
+// 1. During double write, the newly imported rowsets does not calculate
+// delete bitmap and publish successfully.
+// 2. After conversion, calculate delete bitmap for the rowsets imported
+// during double write. During this period, new data can still be imported
+// witout calculating delete bitmap and publish successfully.
+// 3. Block the new publish, calculate the delete bitmap of the
+// incremental rowsets.
+// 4. Switch the tablet status to TABLET_RUNNING. The newly imported
+// data will calculate delete bitmap.
+Status SchemaChangeHandler::_calc_delete_bitmap_for_mow_table(TabletSharedPtr 
new_tablet,
+                                                              int64_t 
alter_version) {
+    
DBUG_EXECUTE_IF("SchemaChangeHandler._calc_delete_bitmap_for_mow_table.random_failed",
 {
+        if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
+            
LOG_WARNING("SchemaChangeHandler._calc_delete_bitmap_for_mow_table.random_failed");
+            return Status::InternalError("debug schema change calc delete 
bitmap random failed");
+        }
+    });
+
+    // can't do compaction when calc delete bitmap, if the rowset being 
calculated does
+    // a compaction, it may cause the delete bitmap to be missed.
+    std::lock_guard 
base_compaction_lock(new_tablet->get_base_compaction_lock());
+    std::lock_guard 
cumu_compaction_lock(new_tablet->get_cumulative_compaction_lock());
+
+    // step 2
+    int64_t max_version = new_tablet->max_version().second;
+    std::vector<RowsetSharedPtr> rowsets;
+    if (alter_version < max_version) {
+        LOG(INFO) << "alter table for unique with merge-on-write, calculate 
delete bitmap of "
+                  << "double write rowsets for version: " << alter_version + 1 
<< "-" << max_version
+                  << " new_tablet=" << new_tablet->tablet_id();
+        std::shared_lock<std::shared_mutex> 
rlock(new_tablet->get_header_lock());
+        RETURN_IF_ERROR(
+                new_tablet->capture_consistent_rowsets({alter_version + 1, 
max_version}, &rowsets));
+    }
+    for (auto rowset_ptr : rowsets) {
+        std::lock_guard<std::mutex> 
rwlock(new_tablet->get_rowset_update_lock());
+        std::shared_lock<std::shared_mutex> 
rlock(new_tablet->get_header_lock());
+        
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
+    }
+
+    // step 3
+    std::lock_guard<std::mutex> rwlock(new_tablet->get_rowset_update_lock());
+    std::lock_guard<std::shared_mutex> 
new_wlock(new_tablet->get_header_lock());
+    SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
+    int64_t new_max_version = new_tablet->max_version_unlocked().second;
+    rowsets.clear();
+    if (max_version < new_max_version) {
+        LOG(INFO) << "alter table for unique with merge-on-write, calculate 
delete bitmap of "
+                  << "incremental rowsets for version: " << max_version + 1 << 
"-"
+                  << new_max_version << " new_tablet=" << 
new_tablet->tablet_id();
+        RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets({max_version + 
1, new_max_version},
+                                                               &rowsets));
+    }
+    for (auto rowset_ptr : rowsets) {
+        
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
+    }
+    // step 4
+    RETURN_IF_ERROR(new_tablet->set_tablet_state(TabletState::TABLET_RUNNING));
+    new_tablet->save_meta();
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 87b1de31705..949c1d5514c 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -277,7 +277,8 @@ private:
     static Status _validate_alter_result(TabletSharedPtr new_tablet,
                                          const TAlterTabletReqV2& request);
 
-    static Status _convert_historical_rowsets(const SchemaChangeParams& 
sc_params);
+    static Status _convert_historical_rowsets(const SchemaChangeParams& 
sc_params,
+                                              int64_t* real_alter_version);
 
     static Status _parse_request(const SchemaChangeParams& sc_params, 
BlockChanger* changer,
                                  bool* sc_sorting, bool* sc_directly);
@@ -286,6 +287,9 @@ private:
     static Status _init_column_mapping(ColumnMapping* column_mapping,
                                        const TabletColumn& column_schema, 
const std::string& value);
 
+    static Status _calc_delete_bitmap_for_mow_table(TabletSharedPtr new_tablet,
+                                                    int64_t alter_version);
+
     static std::shared_mutex _mutex;
     static std::unordered_set<int64_t> _tablet_ids_in_converting;
     static std::set<std::string> _supported_functions;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 4f1c3229958..922bec70278 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1026,14 +1026,6 @@ bool Tablet::can_do_compaction(size_t path_hash, 
CompactionType compaction_type)
         return false;
     }
 
-    // unique key table with merge-on-write also cann't do cumulative 
compaction under alter
-    // process. It may cause the delete bitmap calculation error, such as two
-    // rowsets have same key.
-    if (tablet_state() != TABLET_RUNNING && keys_type() == UNIQUE_KEYS &&
-        enable_unique_key_merge_on_write()) {
-        return false;
-    }
-
     if (data_dir()->path_hash() != path_hash || !is_used() || 
!init_succeeded()) {
         return false;
     }
@@ -1762,6 +1754,13 @@ void Tablet::build_tablet_report_info(TTabletInfo* 
tablet_info,
         }
     }
 
+    // There are two cases when tablet state is TABLET_NOTREADY
+    // case 1: tablet is doing schema change. Fe knows it's state, doing 
nothing.
+    // case 2: tablet has finished schema change, but failed. Fe will perform 
recovery.
+    if (tablet_state() == TABLET_NOTREADY && is_alter_failed()) {
+        tablet_info->__set_used(false);
+    }
+
     if (tablet_state() == TABLET_SHUTDOWN) {
         tablet_info->__set_used(false);
     }
@@ -3287,6 +3286,13 @@ void Tablet::_rowset_ids_difference(const 
RowsetIdUnorderedSet& cur,
 
 // The caller should hold _rowset_update_lock and _meta_lock lock.
 Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& 
rowset) {
+    DBUG_EXECUTE_IF("Tablet.update_delete_bitmap_without_lock.random_failed", {
+        if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
+            
LOG_WARNING("Tablet.update_delete_bitmap_without_lock.random_failed");
+            return Status::InternalError(
+                    "debug tablet update delete bitmap without lock random 
failed");
+        }
+    });
     int64_t cur_version = rowset->end_version();
     std::vector<segment_v2::SegmentSharedPtr> segments;
     RETURN_IF_ERROR(_load_rowset_segments(rowset, &segments));
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 03737ea241b..d55662d75df 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -557,6 +557,8 @@ public:
             SegmentCacheHandle* segment_cache_handle,
             std::unique_ptr<segment_v2::ColumnIterator>* column_iterator,
             OlapReaderStatistics* stats);
+    void set_alter_failed(bool alter_failed) { _alter_failed = alter_failed; }
+    bool is_alter_failed() { return _alter_failed; }
 
 private:
     Status _init_once_action();
@@ -685,6 +687,8 @@ private:
     // may delete compaction input rowsets.
     std::mutex _cold_compaction_lock;
     int64_t _last_failed_follow_cooldown_time = 0;
+    // `_alter_failed` is used to indicate whether the tablet failed to 
perform a schema change
+    std::atomic<bool> _alter_failed = false;
 
     DISALLOW_COPY_AND_ASSIGN(Tablet);
 
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index e27db6188bd..dc851928577 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -171,12 +171,19 @@ Status EngineCloneTask::_do_clone() {
             
StorageEngine::instance()->tablet_manager()->get_tablet(_clone_req.tablet_id);
 
     // The status of a tablet is not ready, indicating that it is a residual 
tablet after a schema
-    // change failure. It should not provide normal read and write, so drop it 
here.
+    // change failure. Clone a new tablet from remote be to overwrite it. This 
situation basically only
+    // occurs when the be_rebalancer_fuzzy_test configuration is enabled.
     if (tablet && tablet->tablet_state() == TABLET_NOTREADY) {
         LOG(WARNING) << "tablet state is not ready when clone, need to drop 
old tablet, tablet_id="
                      << tablet->tablet_id();
+        // can not drop tablet when under clone. so unregister clone tablet 
firstly.
+        
StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
         
RETURN_IF_ERROR(StorageEngine::instance()->tablet_manager()->drop_tablet(
                 tablet->tablet_id(), tablet->replica_id(), false));
+        if 
(!StorageEngine::instance()->tablet_manager()->register_clone_tablet(
+                    _clone_req.tablet_id)) {
+            return Status::InternalError("tablet {} is under clone", 
_clone_req.tablet_id);
+        }
         tablet.reset();
     }
     bool is_new_tablet = tablet == nullptr;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to