This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 797238cbb71 [fix](merge-on-write) fix schema change may result in
delete bitmap incorrect (#29386)
797238cbb71 is described below
commit 797238cbb71572e5d5b5a0bea778b2020dbe3e4a
Author: Xin Liao <[email protected]>
AuthorDate: Tue Jan 2 23:45:04 2024 +0800
[fix](merge-on-write) fix schema change may result in delete bitmap
incorrect (#29386)
---
be/src/olap/schema_change.cpp | 137 +++++++++++++++++++--------------
be/src/olap/schema_change.h | 6 +-
be/src/olap/tablet.cpp | 22 ++++--
be/src/olap/tablet.h | 4 +
be/src/olap/task/engine_clone_task.cpp | 9 ++-
5 files changed, 111 insertions(+), 67 deletions(-)
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index d19d399b1ef..459b1931aea 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -62,6 +62,7 @@
#include "olap/wrapper_field.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
+#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/trace.h"
#include "vec/aggregate_functions/aggregate_function.h"
@@ -711,6 +712,13 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
<< " res=" << res;
return res;
}
+ new_tablet->set_alter_failed(false);
+ Defer defer([&new_tablet] {
+ // if tablet state is not TABLET_RUNNING when return, indicates that
alter has failed.
+ if (new_tablet->tablet_state() != TABLET_RUNNING) {
+ new_tablet->set_alter_failed(true);
+ }
+ });
LOG(INFO) << "finish to validate alter tablet request. begin to convert
data from base tablet "
"to new tablet"
@@ -918,7 +926,8 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.insert(new_tablet->tablet_id());
}
- res = _convert_historical_rowsets(sc_params);
+ int64_t real_alter_version = 0;
+ res = _convert_historical_rowsets(sc_params, &real_alter_version);
{
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.erase(new_tablet->tablet_id());
@@ -927,65 +936,12 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
break;
}
- // For unique with merge-on-write table, should process delete bitmap
here.
- // 1. During double write, the newly imported rowsets does not
calculate
- // delete bitmap and publish successfully.
- // 2. After conversion, calculate delete bitmap for the rowsets
imported
- // during double write. During this period, new data can still be
imported
- // witout calculating delete bitmap and publish successfully.
- // 3. Block the new publish, calculate the delete bitmap of the
- // incremental rowsets.
- // 4. Switch the tablet status to TABLET_RUNNING. The newly imported
- // data will calculate delete bitmap.
if (new_tablet->keys_type() == UNIQUE_KEYS &&
new_tablet->enable_unique_key_merge_on_write()) {
- // step 2
- int64_t max_version = new_tablet->max_version().second;
- std::vector<RowsetSharedPtr> rowsets;
- if (end_version < max_version) {
- LOG(INFO)
- << "alter table for unique with merge-on-write,
calculate delete bitmap of "
- << "double write rowsets for version: " << end_version
+ 1 << "-"
- << max_version;
- RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets(
- {end_version + 1, max_version}, &rowsets));
- }
- for (auto rowset_ptr : rowsets) {
- if (rowset_ptr->version().second <= end_version) {
- continue;
- }
- std::lock_guard<std::mutex>
rwlock(new_tablet->get_rowset_update_lock());
- std::shared_lock<std::shared_mutex>
wrlock(new_tablet->get_header_lock());
-
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
- }
-
- // step 3
- std::lock_guard<std::mutex>
rwlock(new_tablet->get_rowset_update_lock());
- std::lock_guard<std::shared_mutex>
new_wlock(new_tablet->get_header_lock());
- SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
- int64_t new_max_version =
new_tablet->max_version_unlocked().second;
- rowsets.clear();
- if (max_version < new_max_version) {
- LOG(INFO)
- << "alter table for unique with merge-on-write,
calculate delete bitmap of "
- << "incremental rowsets for version: " << max_version
+ 1 << "-"
- << new_max_version;
- RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets(
- {max_version + 1, new_max_version}, &rowsets));
- }
- for (auto rowset_ptr : rowsets) {
- if (rowset_ptr->version().second <= max_version) {
- continue;
- }
-
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
- }
-
- // step 4
- res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
+ res = _calc_delete_bitmap_for_mow_table(new_tablet,
real_alter_version);
if (!res) {
break;
}
- new_tablet->save_meta();
} else {
// set state to ready
std::lock_guard<std::shared_mutex>
new_wlock(new_tablet->get_header_lock());
@@ -1035,7 +991,10 @@ Status SchemaChangeHandler::_get_versions_to_be_changed(
return Status::OK();
}
-Status SchemaChangeHandler::_convert_historical_rowsets(const
SchemaChangeParams& sc_params) {
+// The `real_alter_version` parameter indicates that the version of
[0-real_alter_version] is
+// converted from a base tablet, only used for the mow table now.
+Status SchemaChangeHandler::_convert_historical_rowsets(const
SchemaChangeParams& sc_params,
+ int64_t*
real_alter_version) {
LOG(INFO) << "begin to convert historical rowsets for new_tablet from
base_tablet."
<< " base_tablet=" << sc_params.base_tablet->tablet_id()
<< ", new_tablet=" << sc_params.new_tablet->tablet_id();
@@ -1146,7 +1105,7 @@ Status
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
<< "tablet=" << sc_params.new_tablet->tablet_id() <<
", version='"
<< rs_reader->version().first << "-" <<
rs_reader->version().second;
StorageEngine::instance()->add_unused_rowset(new_rowset);
- res = Status::OK();
+ return process_alter_exit();
} else if (!res) {
LOG(WARNING) << "failed to register new version. "
<< " tablet=" << sc_params.new_tablet->tablet_id()
@@ -1159,6 +1118,7 @@ Status
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
<< ", version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
}
+ *real_alter_version = rs_reader->version().second;
VLOG_TRACE << "succeed to convert a history version."
<< " version=" << rs_reader->version().first << "-"
@@ -1377,4 +1337,67 @@ Status
SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_tablet,
return Status::OK();
}
+// For unique with merge-on-write table, should process delete bitmap here.
+// 1. During double write, the newly imported rowsets does not calculate
+// delete bitmap and publish successfully.
+// 2. After conversion, calculate delete bitmap for the rowsets imported
+// during double write. During this period, new data can still be imported
+// witout calculating delete bitmap and publish successfully.
+// 3. Block the new publish, calculate the delete bitmap of the
+// incremental rowsets.
+// 4. Switch the tablet status to TABLET_RUNNING. The newly imported
+// data will calculate delete bitmap.
+Status SchemaChangeHandler::_calc_delete_bitmap_for_mow_table(TabletSharedPtr
new_tablet,
+ int64_t
alter_version) {
+
DBUG_EXECUTE_IF("SchemaChangeHandler._calc_delete_bitmap_for_mow_table.random_failed",
{
+ if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
+
LOG_WARNING("SchemaChangeHandler._calc_delete_bitmap_for_mow_table.random_failed");
+ return Status::InternalError("debug schema change calc delete
bitmap random failed");
+ }
+ });
+
+ // can't do compaction when calc delete bitmap, if the rowset being
calculated does
+ // a compaction, it may cause the delete bitmap to be missed.
+ std::lock_guard
base_compaction_lock(new_tablet->get_base_compaction_lock());
+ std::lock_guard
cumu_compaction_lock(new_tablet->get_cumulative_compaction_lock());
+
+ // step 2
+ int64_t max_version = new_tablet->max_version().second;
+ std::vector<RowsetSharedPtr> rowsets;
+ if (alter_version < max_version) {
+ LOG(INFO) << "alter table for unique with merge-on-write, calculate
delete bitmap of "
+ << "double write rowsets for version: " << alter_version + 1
<< "-" << max_version
+ << " new_tablet=" << new_tablet->tablet_id();
+ std::shared_lock<std::shared_mutex>
rlock(new_tablet->get_header_lock());
+ RETURN_IF_ERROR(
+ new_tablet->capture_consistent_rowsets({alter_version + 1,
max_version}, &rowsets));
+ }
+ for (auto rowset_ptr : rowsets) {
+ std::lock_guard<std::mutex>
rwlock(new_tablet->get_rowset_update_lock());
+ std::shared_lock<std::shared_mutex>
rlock(new_tablet->get_header_lock());
+
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
+ }
+
+ // step 3
+ std::lock_guard<std::mutex> rwlock(new_tablet->get_rowset_update_lock());
+ std::lock_guard<std::shared_mutex>
new_wlock(new_tablet->get_header_lock());
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
+ int64_t new_max_version = new_tablet->max_version_unlocked().second;
+ rowsets.clear();
+ if (max_version < new_max_version) {
+ LOG(INFO) << "alter table for unique with merge-on-write, calculate
delete bitmap of "
+ << "incremental rowsets for version: " << max_version + 1 <<
"-"
+ << new_max_version << " new_tablet=" <<
new_tablet->tablet_id();
+ RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets({max_version +
1, new_max_version},
+ &rowsets));
+ }
+ for (auto rowset_ptr : rowsets) {
+
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
+ }
+ // step 4
+ RETURN_IF_ERROR(new_tablet->set_tablet_state(TabletState::TABLET_RUNNING));
+ new_tablet->save_meta();
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 87b1de31705..949c1d5514c 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -277,7 +277,8 @@ private:
static Status _validate_alter_result(TabletSharedPtr new_tablet,
const TAlterTabletReqV2& request);
- static Status _convert_historical_rowsets(const SchemaChangeParams&
sc_params);
+ static Status _convert_historical_rowsets(const SchemaChangeParams&
sc_params,
+ int64_t* real_alter_version);
static Status _parse_request(const SchemaChangeParams& sc_params,
BlockChanger* changer,
bool* sc_sorting, bool* sc_directly);
@@ -286,6 +287,9 @@ private:
static Status _init_column_mapping(ColumnMapping* column_mapping,
const TabletColumn& column_schema,
const std::string& value);
+ static Status _calc_delete_bitmap_for_mow_table(TabletSharedPtr new_tablet,
+ int64_t alter_version);
+
static std::shared_mutex _mutex;
static std::unordered_set<int64_t> _tablet_ids_in_converting;
static std::set<std::string> _supported_functions;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 4f1c3229958..922bec70278 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1026,14 +1026,6 @@ bool Tablet::can_do_compaction(size_t path_hash,
CompactionType compaction_type)
return false;
}
- // unique key table with merge-on-write also cann't do cumulative
compaction under alter
- // process. It may cause the delete bitmap calculation error, such as two
- // rowsets have same key.
- if (tablet_state() != TABLET_RUNNING && keys_type() == UNIQUE_KEYS &&
- enable_unique_key_merge_on_write()) {
- return false;
- }
-
if (data_dir()->path_hash() != path_hash || !is_used() ||
!init_succeeded()) {
return false;
}
@@ -1762,6 +1754,13 @@ void Tablet::build_tablet_report_info(TTabletInfo*
tablet_info,
}
}
+ // There are two cases when tablet state is TABLET_NOTREADY
+ // case 1: tablet is doing schema change. Fe knows it's state, doing
nothing.
+ // case 2: tablet has finished schema change, but failed. Fe will perform
recovery.
+ if (tablet_state() == TABLET_NOTREADY && is_alter_failed()) {
+ tablet_info->__set_used(false);
+ }
+
if (tablet_state() == TABLET_SHUTDOWN) {
tablet_info->__set_used(false);
}
@@ -3287,6 +3286,13 @@ void Tablet::_rowset_ids_difference(const
RowsetIdUnorderedSet& cur,
// The caller should hold _rowset_update_lock and _meta_lock lock.
Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr&
rowset) {
+ DBUG_EXECUTE_IF("Tablet.update_delete_bitmap_without_lock.random_failed", {
+ if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
+
LOG_WARNING("Tablet.update_delete_bitmap_without_lock.random_failed");
+ return Status::InternalError(
+ "debug tablet update delete bitmap without lock random
failed");
+ }
+ });
int64_t cur_version = rowset->end_version();
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(_load_rowset_segments(rowset, &segments));
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 03737ea241b..d55662d75df 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -557,6 +557,8 @@ public:
SegmentCacheHandle* segment_cache_handle,
std::unique_ptr<segment_v2::ColumnIterator>* column_iterator,
OlapReaderStatistics* stats);
+ void set_alter_failed(bool alter_failed) { _alter_failed = alter_failed; }
+ bool is_alter_failed() { return _alter_failed; }
private:
Status _init_once_action();
@@ -685,6 +687,8 @@ private:
// may delete compaction input rowsets.
std::mutex _cold_compaction_lock;
int64_t _last_failed_follow_cooldown_time = 0;
+ // `_alter_failed` is used to indicate whether the tablet failed to
perform a schema change
+ std::atomic<bool> _alter_failed = false;
DISALLOW_COPY_AND_ASSIGN(Tablet);
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index e27db6188bd..dc851928577 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -171,12 +171,19 @@ Status EngineCloneTask::_do_clone() {
StorageEngine::instance()->tablet_manager()->get_tablet(_clone_req.tablet_id);
// The status of a tablet is not ready, indicating that it is a residual
tablet after a schema
- // change failure. It should not provide normal read and write, so drop it
here.
+ // change failure. Clone a new tablet from remote be to overwrite it. This
situation basically only
+ // occurs when the be_rebalancer_fuzzy_test configuration is enabled.
if (tablet && tablet->tablet_state() == TABLET_NOTREADY) {
LOG(WARNING) << "tablet state is not ready when clone, need to drop
old tablet, tablet_id="
<< tablet->tablet_id();
+ // can not drop tablet when under clone. so unregister clone tablet
firstly.
+
StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
RETURN_IF_ERROR(StorageEngine::instance()->tablet_manager()->drop_tablet(
tablet->tablet_id(), tablet->replica_id(), false));
+ if
(!StorageEngine::instance()->tablet_manager()->register_clone_tablet(
+ _clone_req.tablet_id)) {
+ return Status::InternalError("tablet {} is under clone",
_clone_req.tablet_id);
+ }
tablet.reset();
}
bool is_new_tablet = tablet == nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]