This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 6f8ea15a729 [fix](merge-on-write) fix duplicate key in schema change
(#25705) (#25914)
6f8ea15a729 is described below
commit 6f8ea15a729c13c884e146298d5e9100ac535d29
Author: Xin Liao <[email protected]>
AuthorDate: Wed Oct 25 21:18:03 2023 +0800
[fix](merge-on-write) fix duplicate key in schema change (#25705) (#25914)
---
be/src/olap/delta_writer.cpp | 16 +++---------
be/src/olap/full_compaction.cpp | 3 +--
be/src/olap/schema_change.cpp | 13 +++-------
be/src/olap/storage_engine.cpp | 7 ++++++
be/src/olap/tablet.cpp | 32 ++++++++++++++----------
be/src/olap/tablet.h | 2 +-
be/src/olap/tablet_meta.cpp | 5 ----
be/src/olap/task/engine_clone_task.cpp | 10 ++++++++
be/src/olap/task/engine_publish_version_task.cpp | 32 ++++++++++++++++--------
9 files changed, 66 insertions(+), 54 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 87316c3cfba..3634e19c68c 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -164,8 +164,7 @@ Status DeltaWriter::init() {
std::lock_guard<std::shared_mutex> lck(_tablet->get_header_lock());
_cur_max_version = _tablet->max_version_unlocked().second;
// tablet is under alter process. The delete bitmap will be calculated
after conversion.
- if (_tablet->tablet_state() == TABLET_NOTREADY &&
- SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+ if (_tablet->tablet_state() == TABLET_NOTREADY) {
// Disable 'partial_update' when the tablet is undergoing a
'schema changing process'
if (_req.table_schema_param->is_partial_update()) {
return Status::InternalError(
@@ -174,7 +173,7 @@ Status DeltaWriter::init() {
}
_rowset_ids.clear();
} else {
- _rowset_ids = _tablet->all_rs_id(_cur_max_version);
+ RETURN_IF_ERROR(_tablet->all_rs_id(_cur_max_version,
&_rowset_ids));
}
}
@@ -459,8 +458,7 @@ Status DeltaWriter::submit_calc_delete_bitmap_task() {
std::lock_guard<std::mutex> l(_lock);
// tablet is under alter process. The delete bitmap will be calculated
after conversion.
- if (_tablet->tablet_state() == TABLET_NOTREADY &&
- SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+ if (_tablet->tablet_state() == TABLET_NOTREADY) {
LOG(INFO) << "tablet is under alter process, delete bitmap will be
calculated later, "
"tablet_id: "
<< _tablet->tablet_id() << " txn_id: " << _req.txn_id;
@@ -469,11 +467,6 @@ Status DeltaWriter::submit_calc_delete_bitmap_task() {
auto beta_rowset = reinterpret_cast<BetaRowset*>(_cur_rowset.get());
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
- // tablet is under alter process. The delete bitmap will be calculated
after conversion.
- if (_tablet->tablet_state() == TABLET_NOTREADY &&
- SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
- return Status::OK();
- }
if (segments.size() > 1) {
// calculate delete bitmap between segments
RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset,
segments,
@@ -510,8 +503,7 @@ Status DeltaWriter::commit_txn(const PSlaveTabletNodes&
slave_tablet_nodes,
const bool write_single_replica) {
if (_tablet->enable_unique_key_merge_on_write() &&
config::enable_merge_on_write_correctness_check &&
_cur_rowset->num_rows() != 0 &&
- !(_tablet->tablet_state() == TABLET_NOTREADY &&
- SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id()))) {
+ _tablet->tablet_state() != TABLET_NOTREADY) {
auto st = _tablet->check_delete_bitmap_correctness(
_delete_bitmap, _cur_rowset->end_version() - 1, _req.txn_id,
_rowset_ids);
if (!st.ok()) {
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index 9b832ec5552..e95d6750ad3 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -147,8 +147,7 @@ Status
FullCompaction::_full_compaction_update_delete_bitmap(const RowsetSharedP
std::vector<RowsetSharedPtr> tmp_rowsets {};
// tablet is under alter process. The delete bitmap will be calculated
after conversion.
- if (_tablet->tablet_state() == TABLET_NOTREADY &&
- SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+ if (_tablet->tablet_state() == TABLET_NOTREADY) {
LOG(INFO) << "tablet is under alter process, update delete bitmap
later, tablet_id="
<< _tablet->tablet_id();
return Status::OK();
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 740255b4325..acd468a47e8 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -916,12 +916,9 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
_tablet_ids_in_converting.insert(new_tablet->tablet_id());
}
res = _convert_historical_rowsets(sc_params);
- if (new_tablet->keys_type() != UNIQUE_KEYS ||
- !new_tablet->enable_unique_key_merge_on_write() || !res) {
- {
- std::lock_guard<std::shared_mutex> wrlock(_mutex);
- _tablet_ids_in_converting.erase(new_tablet->tablet_id());
- }
+ {
+ std::lock_guard<std::shared_mutex> wrlock(_mutex);
+ _tablet_ids_in_converting.erase(new_tablet->tablet_id());
}
if (!res) {
break;
@@ -981,10 +978,6 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
}
// step 4
- {
- std::lock_guard<std::shared_mutex> wrlock(_mutex);
- _tablet_ids_in_converting.erase(new_tablet->tablet_id());
- }
res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
if (!res) {
break;
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 52d5072f7d3..a4ecf33a0bb 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -1082,6 +1082,13 @@ void StorageEngine::start_delete_unused_rowset() {
VLOG_NOTICE << "start to remove rowset:" << it->second->rowset_id()
<< ", version:" << it->second->version().first << "-"
<< it->second->version().second;
+ auto tablet_id = it->second->rowset_meta()->tablet_id();
+ auto tablet = _tablet_manager->get_tablet(tablet_id);
+ // delete delete_bitmap of unused rowsets
+ if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
+
tablet->tablet_meta()->delete_bitmap().remove({it->second->rowset_id(), 0, 0},
+
{it->second->rowset_id(), UINT32_MAX, 0});
+ }
Status status = it->second->remove();
VLOG_NOTICE << "remove rowset:" << it->second->rowset_id()
<< " finished. status:" << status;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 527a8b1c372..8a18f0d1d1b 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3320,7 +3320,8 @@ Status Tablet::update_delete_bitmap_without_lock(const
RowsetSharedPtr& rowset)
<< tablet_id() << " cur max_version: " << cur_version;
return Status::OK();
}
- RowsetIdUnorderedSet cur_rowset_ids = all_rs_id(cur_version - 1);
+ RowsetIdUnorderedSet cur_rowset_ids;
+ RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids));
DeleteBitmapPtr delete_bitmap =
std::make_shared<DeleteBitmap>(tablet_id());
RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset, segments,
delete_bitmap));
@@ -3370,7 +3371,7 @@ Status Tablet::commit_phase_update_delete_bitmap(
{
std::shared_lock meta_rlock(_meta_lock);
cur_version = max_version_unlocked().second;
- cur_rowset_ids = all_rs_id(cur_version);
+ RETURN_IF_ERROR(all_rs_id(cur_version, &cur_rowset_ids));
_rowset_ids_difference(cur_rowset_ids, pre_rowset_ids,
&rowset_ids_to_add,
&rowset_ids_to_del);
specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add);
@@ -3411,13 +3412,12 @@ Status Tablet::update_delete_bitmap(const
RowsetSharedPtr& rowset,
{
std::shared_lock meta_rlock(_meta_lock);
// tablet is under alter process. The delete bitmap will be calculated
after conversion.
- if (tablet_state() == TABLET_NOTREADY &&
- SchemaChangeHandler::tablet_in_converting(tablet_id())) {
+ if (tablet_state() == TABLET_NOTREADY) {
LOG(INFO) << "tablet is under alter process, update delete bitmap
later, tablet_id="
<< tablet_id();
return Status::OK();
}
- cur_rowset_ids = all_rs_id(cur_version - 1);
+ RETURN_IF_ERROR(all_rs_id(cur_version - 1, &cur_rowset_ids));
}
auto t2 = watch.get_elapse_time_us();
@@ -3454,7 +3454,7 @@ Status Tablet::update_delete_bitmap(const
RowsetSharedPtr& rowset,
LOG(INFO) << "[Publish] construct delete bitmap tablet: " << tablet_id()
<< ", rowset_ids to add: " << rowset_ids_to_add.size()
<< ", rowset_ids to del: " << rowset_ids_to_del.size()
- << ", cur max_version: " << cur_version << ", transaction_id: "
<< txn_id << ","
+ << ", cur version: " << cur_version << ", transaction_id: " <<
txn_id << ","
<< ss.str() << " , total rows: " << total_rows;
if (config::enable_merge_on_write_correctness_check && rowset->num_rows()
!= 0) {
@@ -3586,18 +3586,24 @@ Status Tablet::check_rowid_conversion(
return Status::OK();
}
-RowsetIdUnorderedSet Tablet::all_rs_id(int64_t max_version) const {
- RowsetIdUnorderedSet rowset_ids;
- for (const auto& rs_it : _rs_version_map) {
- if (rs_it.first.second == 1) {
+Status Tablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet*
rowset_ids) const {
+ // Ensure that the obtained versions of rowsets are continuous
+ std::vector<Version> version_path;
+ RETURN_IF_ERROR(capture_consistent_versions(Version(0, max_version),
&version_path));
+ for (auto& ver : version_path) {
+ if (ver.second == 1) {
// [0-1] rowset is empty for each tablet, skip it
continue;
}
- if (rs_it.first.second <= max_version) {
- rowset_ids.insert(rs_it.second->rowset_id());
+ auto it = _rs_version_map.find(ver);
+ if (it == _rs_version_map.end()) {
+ return Status::Error<CAPTURE_ROWSET_ERROR, false>(
+ "fail to find Rowset for version. tablet={}, version={}",
tablet_id(),
+ ver.to_string());
}
+ rowset_ids->emplace(it->second->rowset_id());
}
- return rowset_ids;
+ return Status::OK();
}
bool Tablet::check_all_rowset_segment() {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 9e665927d31..7fbf2b9a98f 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -508,7 +508,7 @@ public:
RowsetSharedPtr dst_rowset,
const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation,
RowLocation>>>&
location_map);
- RowsetIdUnorderedSet all_rs_id(int64_t max_version) const;
+ Status all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids)
const;
void sort_block(vectorized::Block& in_block, vectorized::Block&
output_block);
bool check_all_rowset_segment();
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index b407e35a449..69ea6c8d4e8 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -765,11 +765,6 @@ void TabletMeta::modify_rs_metas(const
std::vector<RowsetMetaSharedPtr>& to_add,
++it;
}
}
- // delete delete_bitmap of to_delete's rowsets if not added to
_stale_rs_metas.
- if (same_version && _enable_unique_key_merge_on_write) {
- delete_bitmap().remove({rs_to_del->rowset_id(), 0, 0},
- {rs_to_del->rowset_id(), UINT32_MAX, 0});
- }
}
if (!same_version) {
// put to_delete rowsets in _stale_rs_metas.
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index 4eb43864bec..3621a958eff 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -111,6 +111,16 @@ Status EngineCloneTask::_do_clone() {
// Check local tablet exist or not
TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(_clone_req.tablet_id);
+
+ // The status of a tablet is not ready, indicating that it is a residual
tablet after a schema
+ // change failure. It should not provide normal read and write, so drop it
here.
+ if (tablet && tablet->tablet_state() == TABLET_NOTREADY) {
+ LOG(WARNING) << "tablet state is not ready when clone, need to drop
old tablet, tablet_id="
+ << tablet->tablet_id();
+
RETURN_IF_ERROR(StorageEngine::instance()->tablet_manager()->drop_tablet(
+ tablet->tablet_id(), tablet->replica_id(), false));
+ tablet.reset();
+ }
bool is_new_tablet = tablet == nullptr;
// try to incremental clone
std::vector<Version> missed_versions;
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index 702c4386f11..d6642705baf 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -153,26 +153,24 @@ Status EnginePublishVersionTask::finish() {
StorageEngine::instance()->txn_manager()->update_tablet_version_txn(
tablet_info.tablet_id, version.second,
transaction_id);
}
- Version max_version;
+ int64_t max_version;
TabletState tablet_state;
{
std::shared_lock rdlock(tablet->get_header_lock());
- max_version = tablet->max_version_unlocked();
+ max_version = tablet->max_version_unlocked().second;
tablet_state = tablet->tablet_state();
}
- if (tablet_state == TabletState::TABLET_RUNNING &&
- version.first != max_version.second + 1) {
- // If a tablet migrates out and back, the previously failed
- // publish task may retry on the new tablet, so check
- // whether the version exists. if not exist, then set
- // publish failed
- if (!tablet->check_version_exist(version)) {
+ if (version.first != max_version + 1) {
+ if (tablet->check_version_exist(version)) {
+ continue;
+ }
+ auto handle_version_not_continuous = [&]() {
add_error_tablet_id(tablet_info.tablet_id);
_discontinuous_version_tablets->emplace_back(
partition_id, tablet_info.tablet_id,
version.first);
res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>(
"check_version_exist failed");
- int64_t missed_version = max_version.second + 1;
+ int64_t missed_version = max_version + 1;
int64_t missed_txn_id =
StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version(
tablet->tablet_id(), missed_version);
@@ -187,8 +185,20 @@ Status EnginePublishVersionTask::finish() {
} else {
LOG_EVERY_SECOND(INFO) << msg;
}
+ };
+ // The versions during the schema change period need to be
also continuous
+ if (tablet_state == TabletState::TABLET_NOTREADY) {
+ Version max_continuous_version = {-1, 0};
+
tablet->max_continuous_version_from_beginning(&max_continuous_version);
+ if (max_version > 1 && version.first > max_version &&
+ max_continuous_version.second != max_version) {
+ handle_version_not_continuous();
+ continue;
+ }
+ } else {
+ handle_version_not_continuous();
+ continue;
}
- continue;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]