This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 10b9205600b [fix](merge-on-write) fix schema change may result in
delete bitmap incorrect (#29386) (#29608)
10b9205600b is described below
commit 10b9205600ba31598cea643e581fc40279f40a91
Author: Xin Liao <[email protected]>
AuthorDate: Sun Jan 7 19:20:23 2024 +0800
[fix](merge-on-write) fix schema change may result in delete bitmap
incorrect (#29386) (#29608)
---
be/src/olap/schema_change.cpp | 137 +++++++++++++++++++--------------
be/src/olap/schema_change.h | 6 +-
be/src/olap/tablet.cpp | 23 ++++--
be/src/olap/tablet.h | 4 +
be/src/olap/task/engine_clone_task.cpp | 9 ++-
5 files changed, 112 insertions(+), 67 deletions(-)
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index fc20ecab35d..8882021f2a3 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -63,6 +63,7 @@
#include "olap/wrapper_field.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
+#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/trace.h"
#include "vec/aggregate_functions/aggregate_function.h"
@@ -711,6 +712,13 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
<< " res=" << res;
return res;
}
+ new_tablet->set_alter_failed(false);
+ Defer defer([&new_tablet] {
+ // if tablet state is not TABLET_RUNNING when return, indicates that
alter has failed.
+ if (new_tablet->tablet_state() != TABLET_RUNNING) {
+ new_tablet->set_alter_failed(true);
+ }
+ });
LOG(INFO) << "finish to validate alter tablet request. begin to convert
data from base tablet "
"to new tablet"
@@ -919,7 +927,8 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.insert(new_tablet->tablet_id());
}
- res = _convert_historical_rowsets(sc_params);
+ int64_t real_alter_version = 0;
+ res = _convert_historical_rowsets(sc_params, &real_alter_version);
{
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.erase(new_tablet->tablet_id());
@@ -928,65 +937,12 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
break;
}
- // For unique with merge-on-write table, should process delete bitmap
here.
- // 1. During double write, the newly imported rowsets does not
calculate
- // delete bitmap and publish successfully.
- // 2. After conversion, calculate delete bitmap for the rowsets
imported
- // during double write. During this period, new data can still be
imported
- // witout calculating delete bitmap and publish successfully.
- // 3. Block the new publish, calculate the delete bitmap of the
- // incremental rowsets.
- // 4. Switch the tablet status to TABLET_RUNNING. The newly imported
- // data will calculate delete bitmap.
if (new_tablet->keys_type() == UNIQUE_KEYS &&
new_tablet->enable_unique_key_merge_on_write()) {
- // step 2
- int64_t max_version = new_tablet->max_version().second;
- std::vector<RowsetSharedPtr> rowsets;
- if (end_version < max_version) {
- LOG(INFO)
- << "alter table for unique with merge-on-write,
calculate delete bitmap of "
- << "double write rowsets for version: " << end_version
+ 1 << "-"
- << max_version;
- RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets(
- {end_version + 1, max_version}, &rowsets));
- }
- for (auto rowset_ptr : rowsets) {
- if (rowset_ptr->version().second <= end_version) {
- continue;
- }
- std::lock_guard<std::mutex>
rwlock(new_tablet->get_rowset_update_lock());
- std::shared_lock<std::shared_mutex>
wrlock(new_tablet->get_header_lock());
-
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
- }
-
- // step 3
- std::lock_guard<std::mutex>
rwlock(new_tablet->get_rowset_update_lock());
- std::lock_guard<std::shared_mutex>
new_wlock(new_tablet->get_header_lock());
- SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
- int64_t new_max_version =
new_tablet->max_version_unlocked().second;
- rowsets.clear();
- if (max_version < new_max_version) {
- LOG(INFO)
- << "alter table for unique with merge-on-write,
calculate delete bitmap of "
- << "incremental rowsets for version: " << max_version
+ 1 << "-"
- << new_max_version;
- RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets(
- {max_version + 1, new_max_version}, &rowsets));
- }
- for (auto rowset_ptr : rowsets) {
- if (rowset_ptr->version().second <= max_version) {
- continue;
- }
-
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
- }
-
- // step 4
- res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
+ res = _calc_delete_bitmap_for_mow_table(new_tablet,
real_alter_version);
if (!res) {
break;
}
- new_tablet->save_meta();
} else {
// set state to ready
std::lock_guard<std::shared_mutex>
new_wlock(new_tablet->get_header_lock());
@@ -1036,7 +992,10 @@ Status SchemaChangeHandler::_get_versions_to_be_changed(
return Status::OK();
}
-Status SchemaChangeHandler::_convert_historical_rowsets(const
SchemaChangeParams& sc_params) {
+// The `real_alter_version` parameter indicates that the version of
[0-real_alter_version] is
+// converted from a base tablet, only used for the mow table now.
+Status SchemaChangeHandler::_convert_historical_rowsets(const
SchemaChangeParams& sc_params,
+ int64_t*
real_alter_version) {
LOG(INFO) << "begin to convert historical rowsets for new_tablet from
base_tablet."
<< " base_tablet=" << sc_params.base_tablet->full_name()
<< ", new_tablet=" << sc_params.new_tablet->full_name();
@@ -1148,7 +1107,7 @@ Status
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
<< "tablet=" << sc_params.new_tablet->full_name() <<
", version='"
<< rs_reader->version().first << "-" <<
rs_reader->version().second;
StorageEngine::instance()->add_unused_rowset(new_rowset);
- res = Status::OK();
+ return process_alter_exit();
} else if (!res) {
LOG(WARNING) << "failed to register new version. "
<< " tablet=" << sc_params.new_tablet->full_name()
@@ -1161,6 +1120,7 @@ Status
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
<< ", version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
}
+ *real_alter_version = rs_reader->version().second;
VLOG_TRACE << "succeed to convert a history version."
<< " version=" << rs_reader->version().first << "-"
@@ -1384,4 +1344,67 @@ Status
SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_tablet,
return Status::OK();
}
+// For unique with merge-on-write table, should process delete bitmap here.
+// 1. During double write, the newly imported rowsets does not calculate
+// delete bitmap and publish successfully.
+// 2. After conversion, calculate delete bitmap for the rowsets imported
+// during double write. During this period, new data can still be imported
+// witout calculating delete bitmap and publish successfully.
+// 3. Block the new publish, calculate the delete bitmap of the
+// incremental rowsets.
+// 4. Switch the tablet status to TABLET_RUNNING. The newly imported
+// data will calculate delete bitmap.
+Status SchemaChangeHandler::_calc_delete_bitmap_for_mow_table(TabletSharedPtr
new_tablet,
+ int64_t
alter_version) {
+
DBUG_EXECUTE_IF("SchemaChangeHandler._calc_delete_bitmap_for_mow_table.random_failed",
{
+ if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
+
LOG_WARNING("SchemaChangeHandler._calc_delete_bitmap_for_mow_table.random_failed");
+ return Status::InternalError("debug schema change calc delete
bitmap random failed");
+ }
+ });
+
+ // can't do compaction when calc delete bitmap, if the rowset being
calculated does
+ // a compaction, it may cause the delete bitmap to be missed.
+ std::lock_guard
base_compaction_lock(new_tablet->get_base_compaction_lock());
+ std::lock_guard
cumu_compaction_lock(new_tablet->get_cumulative_compaction_lock());
+
+ // step 2
+ int64_t max_version = new_tablet->max_version().second;
+ std::vector<RowsetSharedPtr> rowsets;
+ if (alter_version < max_version) {
+ LOG(INFO) << "alter table for unique with merge-on-write, calculate
delete bitmap of "
+ << "double write rowsets for version: " << alter_version + 1
<< "-" << max_version
+ << " new_tablet=" << new_tablet->tablet_id();
+ std::shared_lock<std::shared_mutex>
rlock(new_tablet->get_header_lock());
+ RETURN_IF_ERROR(
+ new_tablet->capture_consistent_rowsets({alter_version + 1,
max_version}, &rowsets));
+ }
+ for (auto rowset_ptr : rowsets) {
+ std::lock_guard<std::mutex>
rwlock(new_tablet->get_rowset_update_lock());
+ std::shared_lock<std::shared_mutex>
rlock(new_tablet->get_header_lock());
+
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
+ }
+
+ // step 3
+ std::lock_guard<std::mutex> rwlock(new_tablet->get_rowset_update_lock());
+ std::lock_guard<std::shared_mutex>
new_wlock(new_tablet->get_header_lock());
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
+ int64_t new_max_version = new_tablet->max_version_unlocked().second;
+ rowsets.clear();
+ if (max_version < new_max_version) {
+ LOG(INFO) << "alter table for unique with merge-on-write, calculate
delete bitmap of "
+ << "incremental rowsets for version: " << max_version + 1 <<
"-"
+ << new_max_version << " new_tablet=" <<
new_tablet->tablet_id();
+ RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets({max_version +
1, new_max_version},
+ &rowsets));
+ }
+ for (auto rowset_ptr : rowsets) {
+
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
+ }
+ // step 4
+ RETURN_IF_ERROR(new_tablet->set_tablet_state(TabletState::TABLET_RUNNING));
+ new_tablet->save_meta();
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 2c5075b9ce5..60942f671b0 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -272,7 +272,8 @@ private:
static Status _validate_alter_result(TabletSharedPtr new_tablet,
const TAlterTabletReqV2& request);
- static Status _convert_historical_rowsets(const SchemaChangeParams&
sc_params);
+ static Status _convert_historical_rowsets(const SchemaChangeParams&
sc_params,
+ int64_t* real_alter_version);
static Status _parse_request(const SchemaChangeParams& sc_params,
BlockChanger* changer,
bool* sc_sorting, bool* sc_directly);
@@ -281,6 +282,9 @@ private:
static Status _init_column_mapping(ColumnMapping* column_mapping,
const TabletColumn& column_schema,
const std::string& value);
+ static Status _calc_delete_bitmap_for_mow_table(TabletSharedPtr new_tablet,
+ int64_t alter_version);
+
static std::shared_mutex _mutex;
static std::unordered_set<int64_t> _tablet_ids_in_converting;
static std::set<std::string> _supported_functions;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index a7dc17ef764..1699756d01e 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -112,6 +112,7 @@
#include "segment_loader.h"
#include "service/point_query_executor.h"
#include "util/bvar_helper.h"
+#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/pretty_printer.h"
@@ -1013,14 +1014,6 @@ bool Tablet::can_do_compaction(size_t path_hash,
CompactionType compaction_type)
return false;
}
- // unique key table with merge-on-write also cann't do cumulative
compaction under alter
- // process. It may cause the delete bitmap calculation error, such as two
- // rowsets have same key.
- if (tablet_state() != TABLET_RUNNING && keys_type() == UNIQUE_KEYS &&
- enable_unique_key_merge_on_write()) {
- return false;
- }
-
if (data_dir()->path_hash() != path_hash || !is_used() ||
!init_succeeded()) {
return false;
}
@@ -1746,6 +1739,13 @@ void Tablet::build_tablet_report_info(TTabletInfo*
tablet_info,
}
}
+ // There are two cases when tablet state is TABLET_NOTREADY
+ // case 1: tablet is doing schema change. Fe knows it's state, doing
nothing.
+ // case 2: tablet has finished schema change, but failed. Fe will perform
recovery.
+ if (tablet_state() == TABLET_NOTREADY && is_alter_failed()) {
+ tablet_info->__set_used(false);
+ }
+
if (tablet_state() == TABLET_SHUTDOWN) {
tablet_info->__set_used(false);
}
@@ -3297,6 +3297,13 @@ void Tablet::_rowset_ids_difference(const
RowsetIdUnorderedSet& cur,
// The caller should hold _rowset_update_lock and _meta_lock lock.
Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr&
rowset) {
+ DBUG_EXECUTE_IF("Tablet.update_delete_bitmap_without_lock.random_failed", {
+ if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
+
LOG_WARNING("Tablet.update_delete_bitmap_without_lock.random_failed");
+ return Status::InternalError(
+ "debug tablet update delete bitmap without lock random
failed");
+ }
+ });
int64_t cur_version = rowset->end_version();
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(_load_rowset_segments(rowset, &segments));
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 2a3c2cf4248..626f11031dc 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -570,6 +570,8 @@ public:
Status check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap,
int64_t max_version,
int64_t txn_id, const
RowsetIdUnorderedSet& rowset_ids,
std::vector<RowsetSharedPtr>*
rowsets = nullptr);
+ void set_alter_failed(bool alter_failed) { _alter_failed = alter_failed; }
+ bool is_alter_failed() { return _alter_failed; }
private:
Status _init_once_action();
@@ -704,6 +706,8 @@ private:
// may delete compaction input rowsets.
std::mutex _cold_compaction_lock;
int64_t _last_failed_follow_cooldown_time = 0;
+ // `_alter_failed` is used to indicate whether the tablet failed to
perform a schema change
+ std::atomic<bool> _alter_failed = false;
DISALLOW_COPY_AND_ASSIGN(Tablet);
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index 8f27cea82b9..8a9d6de7690 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -165,12 +165,19 @@ Status EngineCloneTask::_do_clone() {
StorageEngine::instance()->tablet_manager()->get_tablet(_clone_req.tablet_id);
// The status of a tablet is not ready, indicating that it is a residual
tablet after a schema
- // change failure. It should not provide normal read and write, so drop it
here.
+ // change failure. Clone a new tablet from remote be to overwrite it. This
situation basically only
+ // occurs when the be_rebalancer_fuzzy_test configuration is enabled.
if (tablet && tablet->tablet_state() == TABLET_NOTREADY) {
LOG(WARNING) << "tablet state is not ready when clone, need to drop
old tablet, tablet_id="
<< tablet->tablet_id();
+ // can not drop tablet when under clone. so unregister clone tablet
firstly.
+
StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
RETURN_IF_ERROR(StorageEngine::instance()->tablet_manager()->drop_tablet(
tablet->tablet_id(), tablet->replica_id(), false));
+ if
(!StorageEngine::instance()->tablet_manager()->register_clone_tablet(
+ _clone_req.tablet_id)) {
+ return Status::InternalError("tablet {} is under clone",
_clone_req.tablet_id);
+ }
tablet.reset();
}
bool is_new_tablet = tablet == nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]