This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d2688ae6021 [feature](merge-cloud) schema change for mow table (#31819)
d2688ae6021 is described below
commit d2688ae6021314f4e9d2341a7e08f8fe9d43e97a
Author: Xin Liao <[email protected]>
AuthorDate: Wed Mar 6 00:11:50 2024 +0800
[feature](merge-cloud) schema change for mow table (#31819)
---
be/src/cloud/cloud_schema_change_job.cpp | 80 +++++++++++++++++++++++++++++++-
be/src/cloud/cloud_schema_change_job.h | 3 ++
be/src/olap/base_tablet.cpp | 55 ++++++++++++++++++++++
be/src/olap/base_tablet.h | 3 ++
be/src/olap/tablet.cpp | 55 ----------------------
be/src/olap/tablet.h | 3 --
6 files changed, 140 insertions(+), 59 deletions(-)
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index 2099f22f1cd..cd7e0744324 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -39,6 +39,7 @@ namespace doris {
using namespace ErrorCode;
static constexpr int ALTER_TABLE_BATCH_SIZE = 4096;
+static constexpr int SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID = -2;
static std::unique_ptr<SchemaChange> get_sc_procedure(const BlockChanger&
changer,
bool sc_sorting) {
@@ -234,6 +235,7 @@ Status
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
}
// 3. Convert historical data
+ bool already_exist_any_version = false;
for (const auto& rs_reader : sc_params.ref_rowset_readers) {
VLOG_TRACE << "Begin to convert a history rowset. version=" <<
rs_reader->version();
@@ -264,6 +266,7 @@ Status
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
RETURN_IF_ERROR(RowsetFactory::create_rowset(nullptr,
_new_tablet->tablet_path(),
existed_rs_meta,
&rowset));
_output_rowsets.push_back(std::move(rowset));
+ already_exist_any_version = true;
continue;
} else {
return st;
@@ -327,7 +330,18 @@ Status
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
_output_cumulative_point = std::min(_output_cumulative_point,
sc_job->alter_version() + 1);
sc_job->set_output_cumulative_point(_output_cumulative_point);
- // TODO(Lchangliang): process delete bitmap if the table is MOW
+ // process delete bitmap if the table is MOW
+ if (_new_tablet->enable_unique_key_merge_on_write()) {
+ int64_t initiator =
boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
+ std::numeric_limits<int64_t>::max();
+ // If there are historical versions of rowsets, we need to recalculate
their delete
+ // bitmaps, otherwise we will miss the delete bitmaps of incremental
rowsets
+ int64_t start_calc_delete_bitmap_version =
+ already_exist_any_version ? 0 : sc_job->alter_version() + 1;
+ RETURN_IF_ERROR(_process_delete_bitmap(sc_job->alter_version(),
+
start_calc_delete_bitmap_version, initiator));
+ sc_job->set_delete_bitmap_lock_initiator(initiator);
+ }
cloud::FinishTabletJobResponse finish_resp;
st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp);
@@ -361,4 +375,68 @@ Status
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
}
return Status::OK();
}
+
+Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
+ int64_t
start_calc_delete_bitmap_version,
+ int64_t initiator) {
+ LOG_INFO("process mow table")
+ .tag("new_tablet_id", _new_tablet->tablet_id())
+ .tag("out_rowset_size", _output_rowsets.size())
+ .tag("start_calc_delete_bitmap_version",
start_calc_delete_bitmap_version)
+ .tag("alter_version", alter_version);
+ TabletMetaSharedPtr tmp_meta =
std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
+ tmp_meta->delete_bitmap().delete_bitmap.clear();
+ std::shared_ptr<CloudTablet> tmp_tablet =
+ std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
+ {
+ std::unique_lock wlock(tmp_tablet->get_header_lock());
+ tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
+ }
+
+ // step 1, process incremental rowset without delete bitmap update lock
+ std::vector<RowsetSharedPtr> incremental_rowsets;
+
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
+ int64_t max_version = tmp_tablet->max_version().second;
+ LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
+ << "incremental rowsets without lock, version: " <<
start_calc_delete_bitmap_version
+ << "-" << max_version << " new_table_id: " <<
_new_tablet->tablet_id();
+ if (max_version >= start_calc_delete_bitmap_version) {
+ RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
+ {start_calc_delete_bitmap_version, max_version},
&incremental_rowsets));
+ for (auto rowset : incremental_rowsets) {
+
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet,
rowset));
+ }
+ }
+
+ // step 2, process incremental rowset with delete bitmap update lock
+
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().get_delete_bitmap_update_lock(
+ *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator));
+
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
+ int64_t new_max_version = tmp_tablet->max_version().second;
+ LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
+ << "incremental rowsets with lock, version: " << max_version + 1
<< "-"
+ << new_max_version << " new_tablet_id: " <<
_new_tablet->tablet_id();
+ std::vector<RowsetSharedPtr> new_incremental_rowsets;
+ if (new_max_version > max_version) {
+ RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
+ {max_version + 1, new_max_version}, &new_incremental_rowsets));
+ {
+ std::unique_lock wlock(tmp_tablet->get_header_lock());
+ tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
+ }
+ for (auto rowset : new_incremental_rowsets) {
+
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet,
rowset));
+ }
+ }
+
+ auto& delete_bitmap = tmp_tablet->tablet_meta()->delete_bitmap();
+
+ // step4, store delete bitmap
+ RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().update_delete_bitmap(
+ *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator,
&delete_bitmap));
+
+ _new_tablet->tablet_meta()->delete_bitmap() = delete_bitmap;
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/cloud/cloud_schema_change_job.h
b/be/src/cloud/cloud_schema_change_job.h
index 7bb03fda12a..d587111df71 100644
--- a/be/src/cloud/cloud_schema_change_job.h
+++ b/be/src/cloud/cloud_schema_change_job.h
@@ -39,6 +39,9 @@ public:
private:
Status _convert_historical_rowsets(const SchemaChangeParams& sc_params);
+ Status _process_delete_bitmap(int64_t alter_version, int64_t
start_calc_delete_bitmap_version,
+ int64_t initiator);
+
private:
CloudStorageEngine& _cloud_storage_engine;
std::shared_ptr<CloudTablet> _base_tablet;
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index fe76d43c7f0..e5a41abdbd9 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -1375,4 +1375,59 @@ Status BaseTablet::check_rowid_conversion(
return Status::OK();
}
+// The caller should hold _rowset_update_lock and _meta_lock lock.
+Status BaseTablet::update_delete_bitmap_without_lock(const BaseTabletSPtr&
self,
+ const RowsetSharedPtr&
rowset) {
+ DBUG_EXECUTE_IF("Tablet.update_delete_bitmap_without_lock.random_failed", {
+ if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
+
LOG_WARNING("Tablet.update_delete_bitmap_without_lock.random_failed");
+ return Status::InternalError(
+ "debug tablet update delete bitmap without lock random
failed");
+ }
+ });
+ int64_t cur_version = rowset->end_version();
+ std::vector<segment_v2::SegmentSharedPtr> segments;
+
RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
+
+ // If this rowset does not have a segment, there is no need for an update.
+ if (segments.empty()) {
+ LOG(INFO) << "[Schema Change or Clone] skip to construct delete bitmap
tablet: "
+ << self->tablet_id() << " cur max_version: " << cur_version;
+ return Status::OK();
+ }
+ RowsetIdUnorderedSet cur_rowset_ids;
+ RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1,
&cur_rowset_ids));
+ DeleteBitmapPtr delete_bitmap =
std::make_shared<DeleteBitmap>(self->tablet_id());
+ RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset,
segments, delete_bitmap));
+
+ std::vector<RowsetSharedPtr> specified_rowsets =
self->get_rowset_by_ids(&cur_rowset_ids);
+ OlapStopWatch watch;
+ auto token = self->calc_delete_bitmap_executor()->create_token();
+ RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments,
specified_rowsets, delete_bitmap,
+ cur_version - 1, token.get()));
+ RETURN_IF_ERROR(token->wait());
+ size_t total_rows = std::accumulate(
+ segments.begin(), segments.end(), 0,
+ [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
+ LOG(INFO) << "[Schema Change or Clone] construct delete bitmap tablet: "
<< self->tablet_id()
+ << ", rowset_ids: " << cur_rowset_ids.size() << ", cur
max_version: " << cur_version
+ << ", transaction_id: " << -1 << ", cost: " <<
watch.get_elapse_time_us()
+ << "(us), total rows: " << total_rows;
+ if (config::enable_merge_on_write_correctness_check) {
+ // check if all the rowset has ROWSET_SENTINEL_MARK
+ auto st = self->check_delete_bitmap_correctness(delete_bitmap,
cur_version - 1, -1,
+ cur_rowset_ids,
&specified_rowsets);
+ if (!st.ok()) {
+ LOG(WARNING) << fmt::format("delete bitmap correctness check
failed in publish phase!");
+ }
+ self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap);
+ }
+ for (auto& iter : delete_bitmap->delete_bitmap) {
+ self->_tablet_meta->delete_bitmap().merge(
+ {std::get<0>(iter.first), std::get<1>(iter.first),
cur_version}, iter.second);
+ }
+
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 867ff9c1e3f..b59a4303a0b 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -229,6 +229,9 @@ public:
const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation,
RowLocation>>>&
location_map);
+ static Status update_delete_bitmap_without_lock(const BaseTabletSPtr& self,
+ const RowsetSharedPtr&
rowset);
+
////////////////////////////////////////////////////////////////////////////
// end MoW functions
////////////////////////////////////////////////////////////////////////////
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 431577726ad..d1bb734d903 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2300,61 +2300,6 @@ void Tablet::update_max_version_schema(const
TabletSchemaSPtr& tablet_schema) {
}
}
-// The caller should hold _rowset_update_lock and _meta_lock lock.
-Status Tablet::update_delete_bitmap_without_lock(const TabletSharedPtr& self,
- const RowsetSharedPtr&
rowset) {
- DBUG_EXECUTE_IF("Tablet.update_delete_bitmap_without_lock.random_failed", {
- if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
-
LOG_WARNING("Tablet.update_delete_bitmap_without_lock.random_failed");
- return Status::InternalError(
- "debug tablet update delete bitmap without lock random
failed");
- }
- });
- int64_t cur_version = rowset->end_version();
- std::vector<segment_v2::SegmentSharedPtr> segments;
-
RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
-
- // If this rowset does not have a segment, there is no need for an update.
- if (segments.empty()) {
- LOG(INFO) << "[Schema Change or Clone] skip to construct delete bitmap
tablet: "
- << self->tablet_id() << " cur max_version: " << cur_version;
- return Status::OK();
- }
- RowsetIdUnorderedSet cur_rowset_ids;
- RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1,
&cur_rowset_ids));
- DeleteBitmapPtr delete_bitmap =
std::make_shared<DeleteBitmap>(self->tablet_id());
- RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset,
segments, delete_bitmap));
-
- std::vector<RowsetSharedPtr> specified_rowsets =
self->get_rowset_by_ids(&cur_rowset_ids);
- OlapStopWatch watch;
- auto token = self->_engine.calc_delete_bitmap_executor()->create_token();
- RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments,
specified_rowsets, delete_bitmap,
- cur_version - 1, token.get()));
- RETURN_IF_ERROR(token->wait());
- size_t total_rows = std::accumulate(
- segments.begin(), segments.end(), 0,
- [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
- LOG(INFO) << "[Schema Change or Clone] construct delete bitmap tablet: "
<< self->tablet_id()
- << ", rowset_ids: " << cur_rowset_ids.size() << ", cur
max_version: " << cur_version
- << ", transaction_id: " << -1 << ", cost: " <<
watch.get_elapse_time_us()
- << "(us), total rows: " << total_rows;
- if (config::enable_merge_on_write_correctness_check) {
- // check if all the rowset has ROWSET_SENTINEL_MARK
- auto st = self->check_delete_bitmap_correctness(delete_bitmap,
cur_version - 1, -1,
- cur_rowset_ids,
&specified_rowsets);
- if (!st.ok()) {
- LOG(WARNING) << fmt::format("delete bitmap correctness check
failed in publish phase!");
- }
- self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap);
- }
- for (auto& iter : delete_bitmap->delete_bitmap) {
- self->_tablet_meta->delete_bitmap().merge(
- {std::get<0>(iter.first), std::get<1>(iter.first),
cur_version}, iter.second);
- }
-
- return Status::OK();
-}
-
CalcDeleteBitmapExecutor* Tablet::calc_delete_bitmap_executor() {
return _engine.calc_delete_bitmap_executor();
}
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 0f6cfa9c04f..50970c51c5e 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -369,9 +369,6 @@ public:
// end cooldown functions
////////////////////////////////////////////////////////////////////////////
- static Status update_delete_bitmap_without_lock(const TabletSharedPtr&
self,
- const RowsetSharedPtr&
rowset);
-
CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() override;
Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter*
rowset_writer,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]