This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 09609dd2db8 [feature](merge-cloud) support MoW table in cloud mode
(#31317)
09609dd2db8 is described below
commit 09609dd2db8c0ffd40c9d79087e853d7aefbad67
Author: Xin Liao <[email protected]>
AuthorDate: Sat Mar 2 08:36:53 2024 +0800
[feature](merge-cloud) support MoW table in cloud mode (#31317)
---
be/src/agent/agent_server.cpp | 5 +-
be/src/agent/task_worker_pool.cpp | 32 +++
be/src/agent/task_worker_pool.h | 2 +
be/src/cloud/cloud_delete_task.cpp | 10 +-
be/src/cloud/cloud_delta_writer.cpp | 4 +
be/src/cloud/cloud_delta_writer.h | 2 +
.../cloud/cloud_engine_calc_delete_bitmap_task.cpp | 168 +++++++++++++
.../cloud/cloud_engine_calc_delete_bitmap_task.h | 72 ++++++
be/src/cloud/cloud_rowset_builder.cpp | 30 ++-
be/src/cloud/cloud_rowset_builder.h | 2 +
be/src/cloud/cloud_rowset_writer.cpp | 5 -
be/src/cloud/cloud_rowset_writer.h | 3 -
be/src/cloud/cloud_storage_engine.cpp | 11 +
be/src/cloud/cloud_storage_engine.h | 9 +
be/src/cloud/cloud_tablet.cpp | 67 +++++
be/src/cloud/cloud_tablet.h | 13 +
be/src/cloud/cloud_tablets_channel.cpp | 11 +-
be/src/cloud/cloud_txn_delete_bitmap_cache.cpp | 190 ++++++++++++++
be/src/cloud/cloud_txn_delete_bitmap_cache.h | 95 +++++++
be/src/common/config.cpp | 4 +
be/src/common/config.h | 4 +
be/src/olap/base_tablet.cpp | 201 +++++++++++++++
be/src/olap/base_tablet.h | 18 ++
be/src/olap/rowset/beta_rowset_writer.cpp | 2 +-
be/src/olap/rowset/beta_rowset_writer.h | 5 +-
be/src/olap/rowset/rowset.cpp | 4 +
be/src/olap/rowset/rowset.h | 1 +
be/src/olap/rowset/segment_v2/segment_writer.cpp | 5 -
.../rowset/segment_v2/vertical_segment_writer.cpp | 9 -
be/src/olap/rowset_builder.cpp | 2 +-
be/src/olap/rowset_builder.h | 4 +-
be/src/olap/tablet.cpp | 211 ++--------------
be/src/olap/tablet.h | 17 +-
be/src/olap/tablet_fwd.h | 2 +
be/src/runtime/memory/cache_policy.h | 3 +
.../org/apache/doris/analysis/CreateTableStmt.java | 5 -
.../main/java/org/apache/doris/catalog/Table.java | 30 ++-
.../transaction/CloudGlobalTransactionMgr.java | 278 ++++++++++++++++++++-
.../org/apache/doris/common/InternalErrorCode.java | 5 +-
.../apache/doris/common/util/MetaLockUtils.java | 24 ++
.../apache/doris/common/util/PropertyAnalyzer.java | 2 +-
.../java/org/apache/doris/master/MasterImpl.java | 31 ++-
.../trees/plans/commands/info/CreateTableInfo.java | 6 -
.../java/org/apache/doris/task/AgentBatchTask.java | 10 +
.../apache/doris/task/CalcDeleteBitmapTask.java | 119 +++++++++
.../cloud_p0/conf/regression-conf-custom.groovy | 6 +-
46 files changed, 1478 insertions(+), 261 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 269ee490224..fc63c1ff837 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -193,12 +193,15 @@ void AgentServer::cloud_start_workers(CloudStorageEngine&
engine, ExecEnv* exec_
"PUSH", config::delete_worker_count,
[&engine](auto&& task) { cloud_push_callback(engine, task); });
// TODO(plat1ko): SUBMIT_TABLE_COMPACTION worker
- // TODO(plat1ko): CALCULATE_DELETE_BITMAP worker
_workers[TTaskType::ALTER] = std::make_unique<TaskWorkerPool>(
"ALTER_TABLE", config::alter_tablet_worker_count,
[&engine](auto&& task) { return
alter_cloud_tablet_callback(engine, task); });
+ _workers[TTaskType::CALCULATE_DELETE_BITMAP] =
std::make_unique<TaskWorkerPool>(
+ "CALC_DBM_TASK", config::calc_delete_bitmap_worker_count,
+ [&engine](auto&& task) { return
calc_delete_bimtap_callback(engine, task); });
+
_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds,
[&master_info = _master_info] { report_task_callback(master_info);
}));
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index c4fc258552b..af4eade1b3c 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -45,6 +45,7 @@
#include "agent/utils.h"
#include "cloud/cloud_delete_task.h"
+#include "cloud/cloud_engine_calc_delete_bitmap_task.h"
#include "cloud/cloud_schema_change_job.h"
#include "common/config.h"
#include "common/logging.h"
@@ -1864,4 +1865,35 @@ void storage_medium_migrate_callback(StorageEngine&
engine, const TAgentTaskRequ
remove_task_info(req.task_type, req.signature);
}
+void calc_delete_bimtap_callback(CloudStorageEngine& engine, const
TAgentTaskRequest& req) {
+ std::vector<TTabletId> error_tablet_ids;
+ std::vector<TTabletId> succ_tablet_ids;
+ Status status;
+ error_tablet_ids.clear();
+ const auto& calc_delete_bitmap_req = req.calc_delete_bitmap_req;
+ CloudEngineCalcDeleteBitmapTask engine_task(engine,
calc_delete_bitmap_req, &error_tablet_ids,
+ &succ_tablet_ids);
+ status = engine_task.execute();
+
+ TFinishTaskRequest finish_task_request;
+ if (!status) {
+ DorisMetrics::instance()->publish_task_failed_total->increment(1);
+ LOG_WARNING("failed to calculate delete bitmap")
+ .tag("signature", req.signature)
+ .tag("transaction_id", calc_delete_bitmap_req.transaction_id)
+ .tag("error_tablets_num", error_tablet_ids.size())
+ .error(status);
+ }
+
+ status.to_thrift(&finish_task_request.task_status);
+ finish_task_request.__set_backend(BackendOptions::get_local_backend());
+ finish_task_request.__set_task_type(req.task_type);
+ finish_task_request.__set_signature(req.signature);
+ finish_task_request.__set_report_version(s_report_version);
+ finish_task_request.__set_error_tablet_ids(error_tablet_ids);
+
+ finish_task(finish_task_request);
+ remove_task_info(req.task_type, req.signature);
+}
+
} // namespace doris
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 531a64655a1..5c546582576 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -180,4 +180,6 @@ void report_disk_callback(StorageEngine& engine, const
TMasterInfo& master_info)
void report_tablet_callback(StorageEngine& engine, const TMasterInfo&
master_info);
+void calc_delete_bimtap_callback(CloudStorageEngine& engine, const
TAgentTaskRequest& req);
+
} // namespace doris
diff --git a/be/src/cloud/cloud_delete_task.cpp
b/be/src/cloud/cloud_delete_task.cpp
index 2e3913533c3..7895799f9be 100644
--- a/be/src/cloud/cloud_delete_task.cpp
+++ b/be/src/cloud/cloud_delete_task.cpp
@@ -83,7 +83,15 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine,
const TPushReq& requ
tablet->fetch_add_approximate_num_rowsets(1);
tablet->fetch_add_approximate_cumu_num_rowsets(1);
- // TODO(liaoxin): set_tablet_txn_info if enable_unique_key_merge_on_write
+ // TODO(liaoxin) delete operator don't send calculate delete bitmap task
from fe,
+ // then we don't need to set_txn_related_delete_bitmap here.
+ if (tablet->enable_unique_key_merge_on_write()) {
+ DeleteBitmapPtr delete_bitmap =
std::make_shared<DeleteBitmap>(tablet->tablet_id());
+ RowsetIdUnorderedSet rowset_ids;
+ engine.txn_delete_bitmap_cache().set_tablet_txn_info(
+ request.transaction_id, tablet->tablet_id(), delete_bitmap,
rowset_ids, rowset,
+ request.timeout, nullptr);
+ }
return Status::OK();
}
diff --git a/be/src/cloud/cloud_delta_writer.cpp
b/be/src/cloud/cloud_delta_writer.cpp
index 9d549997cd7..f0b148448f7 100644
--- a/be/src/cloud/cloud_delta_writer.cpp
+++ b/be/src/cloud/cloud_delta_writer.cpp
@@ -114,4 +114,8 @@ Status CloudDeltaWriter::commit_rowset() {
return _engine.meta_mgr().commit_rowset(*rowset_meta(), true);
}
+Status CloudDeltaWriter::set_txn_related_delete_bitmap() {
+ return rowset_builder()->set_txn_related_delete_bitmap();
+}
+
} // namespace doris
diff --git a/be/src/cloud/cloud_delta_writer.h
b/be/src/cloud/cloud_delta_writer.h
index 61a025130f0..c4153a075db 100644
--- a/be/src/cloud/cloud_delta_writer.h
+++ b/be/src/cloud/cloud_delta_writer.h
@@ -51,6 +51,8 @@ public:
Status commit_rowset();
+ Status set_txn_related_delete_bitmap();
+
private:
// Convert `_rowset_builder` from `BaseRowsetBuilder` to
`CloudRowsetBuilder`
CloudRowsetBuilder* rowset_builder();
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
new file mode 100644
index 00000000000..2ff9b8e91ba
--- /dev/null
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_engine_calc_delete_bitmap_task.h"
+
+#include <memory>
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_tablet.h"
+#include "common/status.h"
+#include "olap/base_tablet.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/rowset.h"
+#include "olap/tablet_fwd.h"
+#include "olap/tablet_meta.h"
+#include "olap/txn_manager.h"
+#include "olap/utils.h"
+
+namespace doris {
+
+CloudEngineCalcDeleteBitmapTask::CloudEngineCalcDeleteBitmapTask(
+ CloudStorageEngine& engine, const TCalcDeleteBitmapRequest&
cal_delete_bitmap_req,
+ std::vector<TTabletId>* error_tablet_ids, std::vector<TTabletId>*
succ_tablet_ids)
+ : _engine(engine),
+ _cal_delete_bitmap_req(cal_delete_bitmap_req),
+ _error_tablet_ids(error_tablet_ids),
+ _succ_tablet_ids(succ_tablet_ids) {}
+
+void CloudEngineCalcDeleteBitmapTask::add_error_tablet_id(int64_t tablet_id,
const Status& err) {
+ std::lock_guard<std::mutex> lck(_mutex);
+ _error_tablet_ids->push_back(tablet_id);
+ if (_res.ok() || _res.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) {
+ _res = err;
+ }
+}
+
+void CloudEngineCalcDeleteBitmapTask::add_succ_tablet_id(int64_t tablet_id) {
+ std::lock_guard<std::mutex> lck(_mutex);
+ _succ_tablet_ids->push_back(tablet_id);
+}
+
+Status CloudEngineCalcDeleteBitmapTask::execute() {
+ int64_t transaction_id = _cal_delete_bitmap_req.transaction_id;
+ OlapStopWatch watch;
+ VLOG_NOTICE << "begin to calculate delete bitmap. transaction_id=" <<
transaction_id;
+ std::unique_ptr<ThreadPoolToken> token =
+ _engine.calc_tablet_delete_bitmap_task_thread_pool()->new_token(
+ ThreadPool::ExecutionMode::CONCURRENT);
+
+ for (const auto& partition : _cal_delete_bitmap_req.partitions) {
+ int64_t version = partition.version;
+ for (auto tablet_id : partition.tablet_ids) {
+ auto base_tablet = DORIS_TRY(_engine.get_tablet(tablet_id));
+ std::shared_ptr<CloudTablet> tablet =
+ std::dynamic_pointer_cast<CloudTablet>(base_tablet);
+ if (tablet == nullptr) {
+ LOG(WARNING) << "can't get tablet when calculate delete
bitmap. tablet_id="
+ << tablet_id;
+ _error_tablet_ids->push_back(tablet_id);
+ _res = Status::Error<ErrorCode::PUSH_TABLE_NOT_EXIST>(
+ "can't get tablet when calculate delete bitmap.
tablet_id={}", tablet_id);
+ break;
+ }
+
+ Status st = tablet->sync_rowsets();
+ if (!st.ok() && !st.is<ErrorCode::INVALID_TABLET_STATE>()) {
+ return st;
+ }
+ if (st.is<ErrorCode::INVALID_TABLET_STATE>()) [[unlikely]] {
+ add_succ_tablet_id(tablet->tablet_id());
+ LOG(INFO)
+ << "tablet is under alter process, delete bitmap will
be calculated later, "
+ "tablet_id: "
+ << tablet->tablet_id() << " txn_id: " << transaction_id
+ << ", request_version=" << version;
+ continue;
+ }
+ int64_t max_version = tablet->max_version_unlocked();
+ if (version != max_version + 1) {
+ _error_tablet_ids->push_back(tablet_id);
+ _res = Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR,
false>(
+ "version not continuous");
+ LOG(WARNING) << "version not continuous, current max version="
<< max_version
+ << ", request_version=" << version
+ << " tablet_id=" << tablet->tablet_id();
+ break;
+ }
+
+ auto tablet_calc_delete_bitmap_ptr =
std::make_shared<CloudTabletCalcDeleteBitmapTask>(
+ _engine, this, tablet, transaction_id, version);
+ auto submit_st = token->submit_func([=]() {
tablet_calc_delete_bitmap_ptr->handle(); });
+ if (!submit_st.ok()) {
+ _res = submit_st;
+ break;
+ }
+ }
+ }
+ // wait for all finished
+ token->wait();
+
+ LOG(INFO) << "finish to calculate delete bitmap on transaction."
+ << "transaction_id=" << transaction_id << ", cost(us): " <<
watch.get_elapse_time_us()
+ << ", error_tablet_size=" << _error_tablet_ids->size()
+ << ", res=" << _res.to_string();
+ return _res;
+}
+
+CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(
+ CloudStorageEngine& engine, CloudEngineCalcDeleteBitmapTask*
engine_task,
+ std::shared_ptr<CloudTablet> tablet, int64_t transaction_id, int64_t
version)
+ : _engine(engine),
+ _engine_calc_delete_bitmap_task(engine_task),
+ _tablet(tablet),
+ _transaction_id(transaction_id),
+ _version(version) {}
+
+void CloudTabletCalcDeleteBitmapTask::handle() const {
+ RowsetSharedPtr rowset;
+ DeleteBitmapPtr delete_bitmap;
+ RowsetIdUnorderedSet rowset_ids;
+ std::shared_ptr<PartialUpdateInfo> partial_update_info;
+ int64_t txn_expiration;
+ Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info(
+ _transaction_id, _tablet->tablet_id(), &rowset, &delete_bitmap,
&rowset_ids,
+ &txn_expiration, &partial_update_info);
+ if (status != Status::OK()) {
+ LOG(WARNING) << "failed to get tablet txn info. tablet_id=" <<
_tablet->tablet_id()
+ << ", txn_id=" << _transaction_id << ", status=" <<
status;
+
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet->tablet_id(),
status);
+ return;
+ }
+
+ rowset->set_version(Version(_version, _version));
+ TabletTxnInfo txn_info;
+ txn_info.rowset = rowset;
+ txn_info.delete_bitmap = delete_bitmap;
+ txn_info.rowset_ids = rowset_ids;
+ txn_info.partial_update_info = partial_update_info;
+ status = CloudTablet::update_delete_bitmap(_tablet, &txn_info,
_transaction_id, txn_expiration);
+ if (status != Status::OK()) {
+ LOG(WARNING) << "failed to calculate delete bitmap. rowset_id=" <<
rowset->rowset_id()
+ << ", tablet_id=" << _tablet->tablet_id() << ", txn_id="
<< _transaction_id
+ << ", status=" << status;
+
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet->tablet_id(),
status);
+ return;
+ }
+
+ _engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet->tablet_id());
+ LOG(INFO) << "calculate delete bitmap successfully on tablet"
+ << ", table_id=" << _tablet->table_id() << ", transaction_id="
<< _transaction_id
+ << ", num_rows=" << rowset->num_rows() << ", res=" << status;
+}
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
new file mode 100644
index 00000000000..514f1b059d5
--- /dev/null
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet.h"
+#include "gen_cpp/AgentService_types.h"
+#include "olap/tablet_fwd.h"
+#include "olap/task/engine_task.h"
+
+namespace doris {
+
+class CloudEngineCalcDeleteBitmapTask;
+
+class CloudTabletCalcDeleteBitmapTask {
+public:
+ CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine,
+ CloudEngineCalcDeleteBitmapTask*
engine_task,
+ std::shared_ptr<CloudTablet> tablet,
int64_t transaction_id,
+ int64_t version);
+ ~CloudTabletCalcDeleteBitmapTask() = default;
+
+ void handle() const;
+
+private:
+ CloudStorageEngine& _engine;
+ CloudEngineCalcDeleteBitmapTask* _engine_calc_delete_bitmap_task;
+
+ std::shared_ptr<CloudTablet> _tablet;
+ int64_t _transaction_id;
+ int64_t _version;
+};
+
+class CloudEngineCalcDeleteBitmapTask : public EngineTask {
+public:
+ CloudEngineCalcDeleteBitmapTask(CloudStorageEngine& engine,
+ const TCalcDeleteBitmapRequest&
cal_delete_bitmap_req,
+ std::vector<TTabletId>* error_tablet_ids,
+ std::vector<TTabletId>* succ_tablet_ids =
nullptr);
+ Status execute() override;
+
+ void add_error_tablet_id(int64_t tablet_id, const Status& err);
+ void add_succ_tablet_id(int64_t tablet_id);
+
+private:
+ CloudStorageEngine& _engine;
+ const TCalcDeleteBitmapRequest& _cal_delete_bitmap_req;
+ std::mutex _mutex;
+ std::vector<TTabletId>* _error_tablet_ids;
+ std::vector<TTabletId>* _succ_tablet_ids;
+
+ Status _res;
+};
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_rowset_builder.cpp
b/be/src/cloud/cloud_rowset_builder.cpp
index 2ac7f2e3337..13f49ae35a5 100644
--- a/be/src/cloud/cloud_rowset_builder.cpp
+++ b/be/src/cloud/cloud_rowset_builder.cpp
@@ -34,8 +34,11 @@ CloudRowsetBuilder::~CloudRowsetBuilder() = default;
Status CloudRowsetBuilder::init() {
_tablet = DORIS_TRY(_engine.get_tablet(_req.tablet_id));
- // TODO(plat1ko): get rowset ids snapshot to calculate delete bitmap
-
+ std::shared_ptr<MowContext> mow_context;
+ if (_tablet->enable_unique_key_merge_on_write()) {
+
RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(_tablet)->sync_rowsets());
+ RETURN_IF_ERROR(init_mow_context(mow_context));
+ }
RETURN_IF_ERROR(check_tablet_version_count());
// build tablet schema in request level
@@ -55,8 +58,7 @@ Status CloudRowsetBuilder::init() {
context.index_id = _req.index_id;
context.tablet = _tablet;
context.write_type = DataWriteType::TYPE_DIRECT;
- // TODO(plat1ko):
- // context.mow_context = mow_context;
+ context.mow_context = mow_context;
context.write_file_cache = _req.write_file_cache;
context.partial_update_info = _partial_update_info;
// New loaded data is always written to latest shared storage
@@ -104,4 +106,24 @@ const RowsetMetaSharedPtr&
CloudRowsetBuilder::rowset_meta() {
return _rowset_writer->rowset_meta();
}
+Status CloudRowsetBuilder::set_txn_related_delete_bitmap() {
+ if (_tablet->enable_unique_key_merge_on_write()) {
+ if (config::enable_merge_on_write_correctness_check &&
_rowset->num_rows() != 0) {
+ auto st = _tablet->check_delete_bitmap_correctness(
+ _delete_bitmap, _rowset->end_version() - 1, _req.txn_id,
_rowset_ids);
+ if (!st.ok()) {
+ LOG(WARNING) << fmt::format(
+
"[tablet_id:{}][txn_id:{}][load_id:{}][partition_id:{}] "
+ "delete bitmap correctness check failed in commit
phase!",
+ _req.tablet_id, _req.txn_id,
UniqueId(_req.load_id).to_string(),
+ _req.partition_id);
+ return st;
+ }
+ }
+ _engine.txn_delete_bitmap_cache().set_tablet_txn_info(
+ _req.txn_id, _tablet->tablet_id(), _delete_bitmap,
_rowset_ids, _rowset,
+ _req.txn_expiration, _partial_update_info);
+ }
+ return Status::OK();
+}
} // namespace doris
diff --git a/be/src/cloud/cloud_rowset_builder.h
b/be/src/cloud/cloud_rowset_builder.h
index 0ce2cfa514d..05e24e66382 100644
--- a/be/src/cloud/cloud_rowset_builder.h
+++ b/be/src/cloud/cloud_rowset_builder.h
@@ -37,6 +37,8 @@ public:
const RowsetMetaSharedPtr& rowset_meta();
+ Status set_txn_related_delete_bitmap();
+
private:
// Convert `_tablet` from `BaseTablet` to `CloudTablet`
CloudTablet* cloud_tablet();
diff --git a/be/src/cloud/cloud_rowset_writer.cpp
b/be/src/cloud/cloud_rowset_writer.cpp
index f7e4f006194..98f7752b660 100644
--- a/be/src/cloud/cloud_rowset_writer.cpp
+++ b/be/src/cloud/cloud_rowset_writer.cpp
@@ -115,9 +115,4 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
return Status::OK();
}
-Status CloudRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
- // TODO(plat1ko)
- return Status::NotSupported("CloudRowsetWriter::_generate_delete_bitmap is
not implemented");
-}
-
} // namespace doris
diff --git a/be/src/cloud/cloud_rowset_writer.h
b/be/src/cloud/cloud_rowset_writer.h
index c8f0ed72d15..1bb2b3d38b8 100644
--- a/be/src/cloud/cloud_rowset_writer.h
+++ b/be/src/cloud/cloud_rowset_writer.h
@@ -30,9 +30,6 @@ public:
Status init(const RowsetWriterContext& rowset_writer_context) override;
Status build(RowsetSharedPtr& rowset) override;
-
-private:
- Status _generate_delete_bitmap(int32_t segment_id) override;
};
} // namespace doris
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index 6eea0972d01..d82e6503c66 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -28,6 +28,7 @@
#include "cloud/cloud_full_compaction.h"
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_tablet_mgr.h"
+#include "cloud/cloud_txn_delete_bitmap_cache.h"
#include "cloud/config.h"
#include "io/fs/s3_file_system.h"
#include "olap/cumulative_compaction_policy.h"
@@ -99,6 +100,10 @@ Status CloudStorageEngine::open() {
_calc_delete_bitmap_executor =
std::make_unique<CalcDeleteBitmapExecutor>();
_calc_delete_bitmap_executor->init();
+ _txn_delete_bitmap_cache =
+
std::make_unique<CloudTxnDeleteBitmapCache>(config::delete_bitmap_agg_cache_capacity);
+ RETURN_IF_ERROR(_txn_delete_bitmap_cache->init());
+
return Status::OK();
}
@@ -150,6 +155,12 @@ Status CloudStorageEngine::start_bg_threads() {
&_evict_quering_rowset_thread));
LOG(INFO) << "evict quering thread started";
+ // add calculate tablet delete bitmap task thread pool
+ RETURN_IF_ERROR(ThreadPoolBuilder("TabletCalDeleteBitmapThreadPool")
+ .set_min_threads(1)
+
.set_max_threads(config::calc_tablet_delete_bitmap_task_max_thread)
+
.build(&_calc_tablet_delete_bitmap_task_thread_pool));
+
// TODO(plat1ko): check_bucket_enable_versioning_thread
// compaction tasks producer thread
diff --git a/be/src/cloud/cloud_storage_engine.h
b/be/src/cloud/cloud_storage_engine.h
index c2841ac9ded..61e4ca9859b 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -24,7 +24,9 @@
//#include "cloud/cloud_full_compaction.h"
#include "cloud/cloud_cumulative_compaction_policy.h"
#include "cloud/cloud_tablet.h"
+#include "cloud_txn_delete_bitmap_cache.h"
#include "olap/storage_engine.h"
+#include "util/threadpool.h"
namespace doris {
namespace cloud {
@@ -59,6 +61,11 @@ public:
CloudTabletMgr& tablet_mgr() { return *_tablet_mgr; }
+ CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() { return
*_txn_delete_bitmap_cache; }
+ std::unique_ptr<ThreadPool>& calc_tablet_delete_bitmap_task_thread_pool() {
+ return _calc_tablet_delete_bitmap_task_thread_pool;
+ }
+
io::FileSystemSPtr latest_fs() const {
std::lock_guard lock(_latest_fs_mtx);
return _latest_fs;
@@ -97,6 +104,8 @@ private:
std::unique_ptr<cloud::CloudMetaMgr> _meta_mgr;
std::unique_ptr<CloudTabletMgr> _tablet_mgr;
+ std::unique_ptr<CloudTxnDeleteBitmapCache> _txn_delete_bitmap_cache;
+ std::unique_ptr<ThreadPool> _calc_tablet_delete_bitmap_task_thread_pool;
// FileSystem with latest shared storage info, new data will be written to
this fs.
mutable std::mutex _latest_fs_mtx;
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 7bd05c8720b..8e3932f7d24 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -24,20 +24,25 @@
#include <rapidjson/stringbuffer.h>
#include <atomic>
+#include <memory>
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "io/cache/block/block_file_cache_factory.h"
#include "olap/olap_define.h"
+#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
+#include "olap/txn_manager.h"
namespace doris {
using namespace ErrorCode;
+static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1;
+
CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr
tablet_meta)
: BaseTablet(std::move(tablet_meta)), _engine(engine) {
_tablet_path = remote_tablet_path(_tablet_meta->tablet_id());
@@ -351,6 +356,36 @@ Result<std::unique_ptr<RowsetWriter>>
CloudTablet::create_rowset_writer(
return RowsetFactory::create_rowset_writer(_engine, context, vertical);
}
+// create a rowset writer with rowset_id and seg_id
+// after writer, merge this transient rowset with original rowset
+Result<std::unique_ptr<RowsetWriter>>
CloudTablet::create_transient_rowset_writer(
+ const Rowset& rowset, std::shared_ptr<PartialUpdateInfo>
partial_update_info,
+ int64_t txn_expiration) {
+ RowsetWriterContext context;
+ context.rowset_state = PREPARED;
+ context.segments_overlap = OVERLAPPING;
+ context.tablet_schema = std::make_shared<TabletSchema>();
+ context.tablet_schema->copy_from(*(rowset.tablet_schema()));
+ context.newest_write_timestamp = UnixSeconds();
+ context.tablet_id = table_id();
+ context.enable_segcompaction = false;
+ context.write_type = DataWriteType::TYPE_DIRECT;
+ context.partial_update_info = std::move(partial_update_info);
+ context.is_transient_rowset_writer = true;
+ context.rowset_id = rowset.rowset_id();
+ context.tablet_id = tablet_id();
+ context.index_id = index_id();
+ context.partition_id = partition_id();
+ context.rowset_dir = remote_tablet_path(tablet_id());
+ context.enable_unique_key_merge_on_write =
enable_unique_key_merge_on_write();
+ context.txn_expiration = txn_expiration;
+ return RowsetFactory::create_rowset_writer(_engine, context, false)
+ .transform([&](auto&& writer) {
+ writer->set_segment_start_id(rowset.num_segments());
+ return writer;
+ });
+}
+
int64_t CloudTablet::get_cloud_base_compaction_score() const {
return _approximate_num_rowsets.load(std::memory_order_relaxed) -
_approximate_cumu_num_rowsets.load(std::memory_order_relaxed);
@@ -476,4 +511,36 @@ std::vector<RowsetSharedPtr>
CloudTablet::pick_candidate_rowsets_to_full_compact
return pick_candidate_rowsets_to_single_replica_compaction();
}
+CalcDeleteBitmapExecutor* CloudTablet::calc_delete_bitmap_executor() {
+ return _engine.calc_delete_bitmap_executor();
+}
+
+Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t
txn_id,
+ DeleteBitmapPtr delete_bitmap,
RowsetWriter* rowset_writer,
+ const RowsetIdUnorderedSet&
cur_rowset_ids) {
+ RowsetSharedPtr rowset = txn_info->rowset;
+ int64_t cur_version = rowset->start_version();
+ // update delete bitmap info, in order to avoid recalculation when trying
again
+ _engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id,
tablet_id(), delete_bitmap,
+ cur_rowset_ids);
+
+ if (txn_info->partial_update_info &&
txn_info->partial_update_info->is_partial_update &&
+ rowset_writer->num_rows() > 0) {
+ const auto& rowset_meta = rowset->rowset_meta();
+ RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
+ }
+
+ DeleteBitmapPtr new_delete_bitmap =
std::make_shared<DeleteBitmap>(tablet_id());
+ for (auto iter = delete_bitmap->delete_bitmap.begin();
+ iter != delete_bitmap->delete_bitmap.end(); ++iter) {
+ new_delete_bitmap->merge({std::get<0>(iter->first),
std::get<1>(iter->first), cur_version},
+ iter->second);
+ }
+
+ RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
+ *this, txn_id, COMPACTION_DELETE_BITMAP_LOCK_ID,
new_delete_bitmap.get()));
+
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index a9fce6dda5f..37ba38c6db3 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -17,7 +17,10 @@
#pragma once
+#include <memory>
+
#include "olap/base_tablet.h"
+#include "olap/partial_update_info.h"
namespace doris {
@@ -172,6 +175,16 @@ public:
std::mutex& get_base_compaction_lock() { return _base_compaction_lock; }
std::mutex& get_cumulative_compaction_lock() { return
_cumulative_compaction_lock; }
+ Result<std::unique_ptr<RowsetWriter>> create_transient_rowset_writer(
+ const Rowset& rowset, std::shared_ptr<PartialUpdateInfo>
partial_update_info,
+ int64_t txn_expiration = 0) override;
+
+ CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() override;
+
+ Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
+ DeleteBitmapPtr delete_bitmap, RowsetWriter*
rowset_writer,
+ const RowsetIdUnorderedSet& cur_rowset_ids)
override;
+
int64_t last_sync_time_s = 0;
int64_t last_load_time_ms = 0;
int64_t last_base_compaction_success_time_ms = 0;
diff --git a/be/src/cloud/cloud_tablets_channel.cpp
b/be/src/cloud/cloud_tablets_channel.cpp
index 07c247ac809..046916aa9a0 100644
--- a/be/src/cloud/cloud_tablets_channel.cpp
+++ b/be/src/cloud/cloud_tablets_channel.cpp
@@ -235,7 +235,16 @@ Status CloudTabletsChannel::close(LoadChannel* parent,
const PTabletWriterAddBlo
}
}
- // TODO(plat1ko): 6. set txn related delete bitmap if necessary
+ // 6. set txn related delete bitmap if necessary
+ for (auto it = writers_to_commit.begin(); it != writers_to_commit.end();) {
+ auto st = (*it)->set_txn_related_delete_bitmap();
+ if (!st.ok()) {
+ _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
+ _close_status = std::move(st);
+ return _close_status;
+ }
+ it++;
+ }
tablet_vec->Reserve(writers_to_commit.size());
for (auto* writer : writers_to_commit) {
diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
new file mode 100644
index 00000000000..08bc035770a
--- /dev/null
+++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_txn_delete_bitmap_cache.h"
+
+#include <fmt/core.h>
+
+#include <chrono>
+#include <memory>
+#include <shared_mutex>
+
+#include "common/status.h"
+#include "common/sync_point.h"
+#include "olap/olap_common.h"
+#include "olap/tablet_meta.h"
+
+namespace doris {
+
+CloudTxnDeleteBitmapCache::CloudTxnDeleteBitmapCache(size_t size_in_bytes)
+ :
LRUCachePolicy(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE,
size_in_bytes,
+ LRUCacheType::SIZE, 86400, 4),
+ _stop_latch(1) {}
+
+CloudTxnDeleteBitmapCache::~CloudTxnDeleteBitmapCache() {
+ _stop_latch.count_down();
+ _clean_thread->join();
+}
+
+Status CloudTxnDeleteBitmapCache::init() {
+ auto st = Thread::create(
+ "CloudTxnDeleteBitmapCache", "clean_txn_dbm_thread",
+ [this]() { this->_clean_thread_callback(); }, &_clean_thread);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to create thread for
CloudTxnDeleteBitmapCache, error: " << st;
+ }
+ return st;
+}
+
+Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
+ TTransactionId transaction_id, int64_t tablet_id, RowsetSharedPtr*
rowset,
+ DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids,
int64_t* txn_expiration,
+ std::shared_ptr<PartialUpdateInfo>* partial_update_info) {
+ {
+ std::shared_lock<std::shared_mutex> rlock(_rwlock);
+ TxnKey key(transaction_id, tablet_id);
+ auto iter = _txn_map.find(key);
+ if (iter == _txn_map.end()) {
+ return Status::Error<ErrorCode::NOT_FOUND, false>(
+ "not found txn info, tablet_id={}, transaction_id={}",
tablet_id,
+ transaction_id);
+ }
+ *rowset = iter->second.rowset;
+ *txn_expiration = iter->second.txn_expiration;
+ *partial_update_info = iter->second.partial_update_info;
+ }
+ std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
+ CacheKey key(key_str);
+ Cache::Handle* handle = cache()->lookup(key);
+
+ DeleteBitmapCacheValue* val =
+ handle == nullptr ? nullptr
+ :
reinterpret_cast<DeleteBitmapCacheValue*>(cache()->value(handle));
+ if (val) {
+ *delete_bitmap = val->delete_bitmap;
+ *rowset_ids = val->rowset_ids;
+ // must call release handle to reduce the reference count,
+ // otherwise there will be memory leak
+ cache()->release(handle);
+ } else {
+ LOG_INFO("cache missed when get delete bitmap")
+ .tag("txn_id", transaction_id)
+ .tag("tablet_id", tablet_id);
+ // Becasue of the rowset_ids become empty, all delete bitmap
+ // will be recalculate in CalcDeleteBitmapTask
+ *delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id);
+ }
+ return Status::OK();
+}
+
+void CloudTxnDeleteBitmapCache::set_tablet_txn_info(
+ TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr
delete_bitmap,
+ const RowsetIdUnorderedSet& rowset_ids, RowsetSharedPtr rowset,
int64_t txn_expiration,
+ std::shared_ptr<PartialUpdateInfo> partial_update_info) {
+ if (txn_expiration <= 0) {
+ txn_expiration = duration_cast<std::chrono::seconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count() +
+ 120;
+ }
+ {
+ std::unique_lock<std::shared_mutex> wlock(_rwlock);
+ TxnKey txn_key(transaction_id, tablet_id);
+ _txn_map[txn_key] = TxnVal(rowset, txn_expiration,
std::move(partial_update_info));
+ _expiration_txn.emplace(txn_expiration, txn_key);
+ }
+ std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
+ CacheKey key(key_str);
+
+ auto val = new DeleteBitmapCacheValue(delete_bitmap, rowset_ids);
+ auto deleter = [](const CacheKey&, void* value) {
+ delete (DeleteBitmapCacheValue*)value; // Just delete to reclaim
+ };
+ size_t charge = sizeof(DeleteBitmapCacheValue);
+ for (auto& [k, v] : val->delete_bitmap->delete_bitmap) {
+ charge += v.getSizeInBytes();
+ }
+ auto handle = cache()->insert(key, val, charge, deleter,
CachePriority::NORMAL);
+ // must call release handle to reduce the reference count,
+ // otherwise there will be memory leak
+ cache()->release(handle);
+ LOG_INFO("set txn related delete bitmap")
+ .tag("txn_id", transaction_id)
+ .tag("expiration", txn_expiration)
+ .tag("tablet_id", tablet_id)
+ .tag("delete_bitmap_size", charge);
+}
+
+void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId
transaction_id,
+ int64_t tablet_id,
+ DeleteBitmapPtr
delete_bitmap,
+ const
RowsetIdUnorderedSet& rowset_ids) {
+ std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
+ CacheKey key(key_str);
+
+ auto val = new DeleteBitmapCacheValue(delete_bitmap, rowset_ids);
+ auto deleter = [](const CacheKey&, void* value) {
+ delete (DeleteBitmapCacheValue*)value; // Just delete to reclaim
+ };
+ size_t charge = sizeof(DeleteBitmapCacheValue);
+ for (auto& [k, v] : val->delete_bitmap->delete_bitmap) {
+ charge += v.getSizeInBytes();
+ }
+ auto handle = cache()->insert(key, val, charge, deleter,
CachePriority::NORMAL);
+ // must call release handle to reduce the reference count,
+ // otherwise there will be memory leak
+ cache()->release(handle);
+ LOG_INFO("update txn related delete bitmap")
+ .tag("txn_id", transaction_id)
+ .tag("tablt_id", tablet_id)
+ .tag("delete_bitmap_size", charge);
+}
+
+void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
+
TEST_SYNC_POINT_RETURN_WITH_VOID("CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info");
+ std::unique_lock<std::shared_mutex> wlock(_rwlock);
+ while (!_expiration_txn.empty()) {
+ auto iter = _expiration_txn.begin();
+ int64_t current_time = duration_cast<std::chrono::seconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ if (iter->first > current_time) {
+ break;
+ }
+ auto txn_iter = _txn_map.find(iter->second);
+ if (iter->first == txn_iter->second.txn_expiration) {
+ LOG_INFO("clean expired delete bitmap")
+ .tag("txn_id", txn_iter->first.txn_id)
+ .tag("expiration", txn_iter->second.txn_expiration)
+ .tag("tablt_id", txn_iter->first.tablet_id);
+ std::string key_str = std::to_string(txn_iter->first.txn_id) + "/"
+
+ std::to_string(txn_iter->first.tablet_id);
// Cache key container
+ CacheKey cache_key(key_str);
+ cache()->erase(cache_key);
+ _txn_map.erase(iter->second);
+ }
+ _expiration_txn.erase(iter);
+ }
+}
+
+void CloudTxnDeleteBitmapCache::_clean_thread_callback() {
+ do {
+ remove_expired_tablet_txn_info();
+ } while (!_stop_latch.wait_for(std::chrono::seconds(300)));
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h
b/be/src/cloud/cloud_txn_delete_bitmap_cache.h
new file mode 100644
index 00000000000..c84e19a765f
--- /dev/null
+++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <mutex>
+
+#include "olap/lru_cache.h"
+#include "olap/olap_common.h"
+#include "olap/partial_update_info.h"
+#include "olap/rowset/rowset.h"
+#include "olap/tablet_meta.h"
+#include "util/countdown_latch.h"
+
+namespace doris {
+
+// Record transaction related delete bitmaps using a lru cache.
+class CloudTxnDeleteBitmapCache : public LRUCachePolicy {
+public:
+ CloudTxnDeleteBitmapCache(size_t size_in_bytes);
+
+ ~CloudTxnDeleteBitmapCache() override;
+
+ Status init();
+
+ Status get_tablet_txn_info(TTransactionId transaction_id, int64_t
tablet_id,
+ RowsetSharedPtr* rowset, DeleteBitmapPtr*
delete_bitmap,
+ RowsetIdUnorderedSet* rowset_ids, int64_t*
txn_expiration,
+ std::shared_ptr<PartialUpdateInfo>*
partial_update_info);
+
+ void set_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id,
+ DeleteBitmapPtr delete_bitmap, const
RowsetIdUnorderedSet& rowset_ids,
+ RowsetSharedPtr rowset, int64_t txn_expirationm,
+ std::shared_ptr<PartialUpdateInfo>
partial_update_info);
+
+ void update_tablet_txn_info(TTransactionId transaction_id, int64_t
tablet_id,
+ DeleteBitmapPtr delete_bitmap,
+ const RowsetIdUnorderedSet& rowset_ids);
+
+ void remove_expired_tablet_txn_info();
+
+private:
+ void _clean_thread_callback();
+
+ struct DeleteBitmapCacheValue {
+ DeleteBitmapPtr delete_bitmap;
+ // records rowsets calc in commit txn
+ RowsetIdUnorderedSet rowset_ids;
+
+ DeleteBitmapCacheValue(DeleteBitmapPtr delete_bitmap_, const
RowsetIdUnorderedSet& ids_)
+ : delete_bitmap(std::move(delete_bitmap_)), rowset_ids(ids_) {}
+ };
+
+ struct TxnKey {
+ TTransactionId txn_id;
+ int64_t tablet_id;
+ TxnKey(TTransactionId txn_id_, int64_t tablet_id_)
+ : txn_id(txn_id_), tablet_id(tablet_id_) {}
+ auto operator<=>(const TxnKey&) const = default;
+ };
+
+ struct TxnVal {
+ RowsetSharedPtr rowset;
+ int64_t txn_expiration;
+ std::shared_ptr<PartialUpdateInfo> partial_update_info;
+ TxnVal() : txn_expiration(0) {};
+ TxnVal(RowsetSharedPtr rowset_, int64_t txn_expiration_,
+ std::shared_ptr<PartialUpdateInfo> partial_update_info_)
+ : rowset(std::move(rowset_)),
+ txn_expiration(txn_expiration_),
+ partial_update_info(std::move(partial_update_info_)) {}
+ };
+
+ std::map<TxnKey, TxnVal> _txn_map;
+ std::multimap<int64_t, TxnKey> _expiration_txn;
+ std::shared_mutex _rwlock;
+ scoped_refptr<Thread> _clean_thread;
+ CountDownLatch _stop_latch;
+};
+
+} // namespace doris
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index b30191d9e17..a08fdd5a6cc 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -156,6 +156,10 @@ DEFINE_Int32(tablet_publish_txn_max_thread, "32");
DEFINE_Int32(publish_version_task_timeout_s, "8");
// the count of thread to calc delete bitmap
DEFINE_Int32(calc_delete_bitmap_max_thread, "32");
+// the count of thread to calc delete bitmap worker, only used for cloud
+DEFINE_Int32(calc_delete_bitmap_worker_count, "8");
+// the count of thread to calc tablet delete bitmap task, only used for cloud
+DEFINE_Int32(calc_tablet_delete_bitmap_task_max_thread, "32");
// the count of thread to clear transaction task
DEFINE_Int32(clear_transaction_task_worker_count, "1");
// the count of thread to delete
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0160919e66d..81685bc1e5c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -204,6 +204,10 @@ DECLARE_Int32(tablet_publish_txn_max_thread);
DECLARE_Int32(publish_version_task_timeout_s);
// the count of thread to calc delete bitmap
DECLARE_Int32(calc_delete_bitmap_max_thread);
+// the count of thread to calc delete bitmap worker, only used for cloud
+DECLARE_Int32(calc_delete_bitmap_worker_count);
+// the count of thread to calc tablet delete bitmap task, only used for cloud
+DECLARE_Int32(calc_tablet_delete_bitmap_task_max_thread);
// the count of thread to clear transaction task
DECLARE_Int32(clear_transaction_task_worker_count);
// the count of thread to delete
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 09960fd0c03..c4ce206d8c5 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -18,7 +18,9 @@
#include "olap/base_tablet.h"
#include <fmt/format.h>
+#include <rapidjson/prettywriter.h>
+#include "common/status.h"
#include "olap/calc_delete_bitmap_executor.h"
#include "olap/delete_bitmap_calculator.h"
#include "olap/memtable.h"
@@ -27,8 +29,10 @@
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/tablet_fwd.h"
+#include "olap/txn_manager.h"
#include "service/point_query_executor.h"
#include "util/bvar_helper.h"
+#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "vec/common/schema_util.h"
#include "vec/data_types/data_type_factory.hpp"
@@ -45,6 +49,7 @@ bvar::LatencyRecorder
g_tablet_lookup_rowkey_latency("doris_pk", "tablet_lookup_
bvar::Adder<uint64_t> g_tablet_pk_not_found("doris_pk", "lookup_not_found");
bvar::PerSecond<bvar::Adder<uint64_t>> g_tablet_pk_not_found_per_second(
"doris_pk", "lookup_not_found_per_second", &g_tablet_pk_not_found, 60);
+bvar::LatencyRecorder g_tablet_update_delete_bitmap_latency("doris_pk",
"update_delete_bitmap");
// read columns by read plan
// read_index: ori_pos-> block_idx
@@ -1059,4 +1064,200 @@ Status BaseTablet::_capture_consistent_rowsets_unlocked(
return Status::OK();
}
+Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr
delete_bitmap,
+ int64_t max_version,
int64_t txn_id,
+ const RowsetIdUnorderedSet&
rowset_ids,
+
std::vector<RowsetSharedPtr>* rowsets) {
+ RowsetIdUnorderedSet missing_ids;
+ for (const auto& rowsetid : rowset_ids) {
+ if (!delete_bitmap->delete_bitmap.contains({rowsetid,
DeleteBitmap::INVALID_SEGMENT_ID,
+
DeleteBitmap::TEMP_VERSION_COMMON})) {
+ missing_ids.insert(rowsetid);
+ }
+ }
+
+ if (!missing_ids.empty()) {
+ LOG(WARNING) << "[txn_id:" << txn_id << "][tablet_id:" << tablet_id()
+ << "][max_version: " << max_version
+ << "] check delete bitmap correctness failed!";
+ rapidjson::Document root;
+ root.SetObject();
+ rapidjson::Document required_rowsets_arr;
+ required_rowsets_arr.SetArray();
+ rapidjson::Document missing_rowsets_arr;
+ missing_rowsets_arr.SetArray();
+
+ if (rowsets != nullptr) {
+ for (const auto& rowset : *rowsets) {
+ rapidjson::Value value;
+ std::string version_str = rowset->get_rowset_info_str();
+ value.SetString(version_str.c_str(), version_str.length(),
+ required_rowsets_arr.GetAllocator());
+ required_rowsets_arr.PushBack(value,
required_rowsets_arr.GetAllocator());
+ }
+ } else {
+ std::vector<RowsetSharedPtr> rowsets;
+ {
+ std::shared_lock meta_rlock(_meta_lock);
+ rowsets = get_rowset_by_ids(&rowset_ids);
+ }
+ for (const auto& rowset : rowsets) {
+ rapidjson::Value value;
+ std::string version_str = rowset->get_rowset_info_str();
+ value.SetString(version_str.c_str(), version_str.length(),
+ required_rowsets_arr.GetAllocator());
+ required_rowsets_arr.PushBack(value,
required_rowsets_arr.GetAllocator());
+ }
+ }
+ for (const auto& missing_rowset_id : missing_ids) {
+ rapidjson::Value miss_value;
+ std::string rowset_id_str = missing_rowset_id.to_string();
+ miss_value.SetString(rowset_id_str.c_str(), rowset_id_str.length(),
+ missing_rowsets_arr.GetAllocator());
+ missing_rowsets_arr.PushBack(miss_value,
missing_rowsets_arr.GetAllocator());
+ }
+
+ root.AddMember("required_rowsets", required_rowsets_arr,
root.GetAllocator());
+ root.AddMember("missing_rowsets", missing_rowsets_arr,
root.GetAllocator());
+ rapidjson::StringBuffer strbuf;
+ rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
+ root.Accept(writer);
+ std::string rowset_status_string = std::string(strbuf.GetString());
+ LOG_EVERY_SECOND(WARNING) << rowset_status_string;
+ // let it crash if correctness check failed in Debug mode
+ DCHECK(false) << "delete bitmap correctness check failed in publish
phase!";
+ return Status::InternalError("check delete bitmap failed!");
+ }
+ return Status::OK();
+}
+
+void BaseTablet::_remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr
delete_bitmap) {
+ for (auto it = delete_bitmap->delete_bitmap.begin(), end =
delete_bitmap->delete_bitmap.end();
+ it != end;) {
+ if (std::get<1>(it->first) == DeleteBitmap::INVALID_SEGMENT_ID) {
+ it = delete_bitmap->delete_bitmap.erase(it);
+ } else {
+ ++it;
+ }
+ }
+}
+
+Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, const
TabletTxnInfo* txn_info,
+ int64_t txn_id, int64_t
txn_expiration) {
+ SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency);
+ RowsetIdUnorderedSet cur_rowset_ids;
+ RowsetIdUnorderedSet rowset_ids_to_add;
+ RowsetIdUnorderedSet rowset_ids_to_del;
+ RowsetSharedPtr rowset = txn_info->rowset;
+ int64_t cur_version = rowset->start_version();
+
+ auto rowset_writer = DORIS_TRY(self->create_transient_rowset_writer(
+ *rowset, txn_info->partial_update_info, txn_expiration));
+
+ DeleteBitmapPtr delete_bitmap = txn_info->delete_bitmap;
+ // Partial update might generate new segments when there is conflicts
while publish, and mark
+ // the same key in original segments as delete.
+ // When the new segment flush fails or the rowset build fails, the
deletion marker for the
+ // duplicate key of the original segment should not remain in
`txn_info->delete_bitmap`,
+ // so we need to make a copy of `txn_info->delete_bitmap` and make changes
on it.
+ if (txn_info->partial_update_info &&
txn_info->partial_update_info->is_partial_update) {
+ delete_bitmap =
std::make_shared<DeleteBitmap>(*(txn_info->delete_bitmap));
+ }
+
+ OlapStopWatch watch;
+ std::vector<segment_v2::SegmentSharedPtr> segments;
+
RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
+ auto t1 = watch.get_elapse_time_us();
+
+ {
+ std::shared_lock meta_rlock(self->_meta_lock);
+ // tablet is under alter process. The delete bitmap will be calculated
after conversion.
+ if (self->tablet_state() == TABLET_NOTREADY) {
+ LOG(INFO) << "tablet is under alter process, update delete bitmap
later, tablet_id="
+ << self->tablet_id();
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1,
&cur_rowset_ids));
+ }
+ auto t2 = watch.get_elapse_time_us();
+
+ _rowset_ids_difference(cur_rowset_ids, txn_info->rowset_ids,
&rowset_ids_to_add,
+ &rowset_ids_to_del);
+ for (const auto& to_del : rowset_ids_to_del) {
+ delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
+ }
+
+ std::vector<RowsetSharedPtr> specified_rowsets;
+ {
+ std::shared_lock meta_rlock(self->_meta_lock);
+ specified_rowsets = self->get_rowset_by_ids(&rowset_ids_to_add);
+ }
+ auto t3 = watch.get_elapse_time_us();
+
+ // When there is only one segment, it will be calculated in the current
thread.
+ // Otherwise, it will be submitted to the thread pool for calculation.
+ if (segments.size() <= 1) {
+ RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments,
specified_rowsets, delete_bitmap,
+ cur_version - 1, nullptr,
rowset_writer.get()));
+
+ } else {
+ auto token = self->calc_delete_bitmap_executor()->create_token();
+ RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments,
specified_rowsets, delete_bitmap,
+ cur_version - 1, token.get(),
rowset_writer.get()));
+ RETURN_IF_ERROR(token->wait());
+ }
+
+ std::stringstream ss;
+ if (watch.get_elapse_time_us() < 1 * 1000 * 1000) {
+ ss << "cost: " << watch.get_elapse_time_us() - t3 << "(us)";
+ } else {
+ ss << "cost(us): (load segments: " << t1 << ", get all rsid: " << t2 -
t1
+ << ", get rowsets: " << t3 - t2
+ << ", calc delete bitmap: " << watch.get_elapse_time_us() - t3 <<
")";
+ }
+
+ size_t total_rows = std::accumulate(
+ segments.begin(), segments.end(), 0,
+ [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
+ LOG(INFO) << "[Publish] construct delete bitmap tablet: " <<
self->tablet_id()
+ << ", rowset_ids to add: " << rowset_ids_to_add.size()
+ << ", rowset_ids to del: " << rowset_ids_to_del.size()
+ << ", cur version: " << cur_version << ", transaction_id: " <<
txn_id << ","
+ << ss.str() << " , total rows: " << total_rows;
+
+ if (config::enable_merge_on_write_correctness_check && rowset->num_rows()
!= 0) {
+ // only do correctness check if the rowset has at least one row written
+ // check if all the rowset has ROWSET_SENTINEL_MARK
+ auto st = self->check_delete_bitmap_correctness(delete_bitmap,
cur_version - 1, -1,
+ cur_rowset_ids,
&specified_rowsets);
+ if (!st.ok()) {
+ LOG(WARNING) << fmt::format("delete bitmap correctness check
failed in publish phase!");
+ }
+ self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap);
+ }
+
+ if (txn_info->partial_update_info &&
txn_info->partial_update_info->is_partial_update) {
+
DBUG_EXECUTE_IF("Tablet.update_delete_bitmap.partial_update_write_rowset_fail",
{
+ if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+
LOG_WARNING("Tablet.update_delete_bitmap.partial_update_write_rowset random
failed")
+ .tag("txn_id", txn_id);
+ return Status::InternalError(
+ "debug update_delete_bitmap partial update write
rowset random failed");
+ }
+ });
+ // build rowset writer and merge transient rowset
+ RETURN_IF_ERROR(rowset_writer->flush());
+ RowsetSharedPtr transient_rowset;
+ RETURN_IF_ERROR(rowset_writer->build(transient_rowset));
+ rowset->merge_rowset_meta(transient_rowset->rowset_meta());
+
+ // erase segment cache cause we will add a segment to rowset
+ SegmentLoader::instance()->erase_segments(rowset->rowset_id(),
rowset->num_segments());
+ }
+
+ RETURN_IF_ERROR(self->save_delete_bitmap(txn_info, txn_id, delete_bitmap,
rowset_writer.get(),
+ cur_rowset_ids));
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 943c257de64..837ff28d461 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -22,6 +22,7 @@
#include <string>
#include "common/status.h"
+#include "olap/partial_update_info.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_meta.h"
@@ -178,6 +179,10 @@ public:
static void add_sentinel_mark_to_delete_bitmap(DeleteBitmap* delete_bitmap,
const RowsetIdUnorderedSet&
rowsetids);
+ Status check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap,
int64_t max_version,
+ int64_t txn_id, const
RowsetIdUnorderedSet& rowset_ids,
+ std::vector<RowsetSharedPtr>*
rowsets = nullptr);
+
static Status generate_new_block_for_partial_update(
TabletSchemaSPtr rowset_schema, const std::vector<uint32>&
missing_cids,
const std::vector<uint32>& update_cids, const
PartialUpdateReadPlan& read_plan_ori,
@@ -198,6 +203,18 @@ public:
const std::vector<uint32_t>& rowids,
const TabletColumn& tablet_column,
vectorized::MutableColumnPtr& dst);
+
+ virtual Result<std::unique_ptr<RowsetWriter>>
create_transient_rowset_writer(
+ const Rowset& rowset, std::shared_ptr<PartialUpdateInfo>
partial_update_info,
+ int64_t txn_expiration = 0) = 0;
+
+ static Status update_delete_bitmap(const BaseTabletSPtr& self, const
TabletTxnInfo* txn_info,
+ int64_t txn_id, int64_t txn_expiration
= 0);
+
+ virtual Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t
txn_id,
+ DeleteBitmapPtr delete_bitmap,
RowsetWriter* rowset_writer,
+ const RowsetIdUnorderedSet&
cur_rowset_ids) = 0;
+ virtual CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() = 0;
////////////////////////////////////////////////////////////////////////////
// end MoW functions
////////////////////////////////////////////////////////////////////////////
@@ -216,6 +233,7 @@ protected:
static void _rowset_ids_difference(const RowsetIdUnorderedSet& cur,
const RowsetIdUnorderedSet& pre,
RowsetIdUnorderedSet* to_add,
RowsetIdUnorderedSet* to_del);
+ static void _remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr
delete_bitmap);
Status _capture_consistent_rowsets_unlocked(const std::vector<Version>&
version_path,
std::vector<RowsetSharedPtr>*
rowsets) const;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 8556e19483f..f58daf6816a 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -166,7 +166,7 @@ Status BaseBetaRowsetWriter::add_block(const
vectorized::Block* block) {
return _segment_creator.add_block(block);
}
-Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
+Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
SCOPED_RAW_TIMER(&_delete_bitmap_ns);
if (!_context.tablet->enable_unique_key_merge_on_write() ||
(_context.partial_update_info &&
_context.partial_update_info->is_partial_update)) {
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index 3681e0fe120..cffb951451e 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -125,12 +125,11 @@ public:
}
private:
- virtual Status _generate_delete_bitmap(int32_t segment_id) = 0;
-
void update_rowset_schema(TabletSchemaSPtr flush_schema);
// build a tmp rowset for load segment to calc delete_bitmap
// for this segment
protected:
+ Status _generate_delete_bitmap(int32_t segment_id);
Status _build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num
= false);
Status _create_file_writer(std::string path, io::FileWriterPtr&
file_writer);
virtual Status _close_file_writers();
@@ -193,8 +192,6 @@ public:
KeyBoundsPB& key_bounds);
private:
- Status _generate_delete_bitmap(int32_t segment_id) override;
-
// segment compaction
friend class SegcompactionWorker;
Status _close_file_writers() override;
diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index bc946b3e06e..781510f2a21 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -75,6 +75,10 @@ void Rowset::make_visible(Version version) {
}
}
+void Rowset::set_version(Version version) {
+ _rowset_meta->set_version(version);
+}
+
bool Rowset::check_rowset_segment() {
std::lock_guard load_lock(_lock);
return check_current_rowset_segment();
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index d9da9907405..b95aad700ae 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -138,6 +138,7 @@ public:
// publish rowset to make it visible to read
void make_visible(Version version);
+ void set_version(Version version);
const TabletSchemaSPtr& tablet_schema() const { return _schema; }
// helper class to access RowsetMeta
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 0dfaf4b7466..235a698021b 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -344,11 +344,6 @@ void
SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) {
// 3. set columns to data convertor and then write all columns
Status SegmentWriter::append_block_with_partial_content(const
vectorized::Block* block,
size_t row_pos, size_t
num_rows) {
- if (config::is_cloud_mode()) {
- // TODO(plat1ko): cloud mode
- return Status::NotSupported("append_block_with_partial_content");
- }
-
auto* tablet = static_cast<Tablet*>(_tablet.get());
if (block->columns() <= _tablet_schema->num_key_columns() ||
block->columns() >= _tablet_schema->num_columns()) {
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 1e3ff116783..16d5e60b09a 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -290,11 +290,6 @@ void
VerticalSegmentWriter::_serialize_block_to_row_column(vectorized::Block& bl
// 2.3 fill block
// 3. set columns to data convertor and then write all columns
Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock&
data) {
- if (config::is_cloud_mode()) {
- // TODO(plat1ko): CloudStorageEngine
- return Status::NotSupported("append_block_with_partial_content");
- }
-
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS &&
_opts.enable_unique_key_merge_on_write);
DCHECK(_opts.rowset_ctx->partial_update_info != nullptr);
@@ -527,10 +522,6 @@ Status VerticalSegmentWriter::_fill_missing_columns(
vectorized::MutableColumns& mutable_full_columns,
const std::vector<bool>& use_default_or_null_flag, bool
has_default_or_nullable,
const size_t& segment_start_pos) {
- if (config::is_cloud_mode()) {
- // TODO(plat1ko): CloudStorageEngine
- return Status::NotSupported("fill_missing_columns");
- }
auto tablet = static_cast<Tablet*>(_tablet.get());
// create old value columns
const auto& missing_cids =
_opts.rowset_ctx->partial_update_info->missing_cids;
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 62263b8ae3c..6a6da21ac62 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -119,7 +119,7 @@ void RowsetBuilder::_garbage_collection() {
}
}
-Status RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>&
mow_context) {
+Status BaseRowsetBuilder::init_mow_context(std::shared_ptr<MowContext>&
mow_context) {
std::lock_guard<std::shared_mutex> lck(tablet()->get_header_lock());
int64_t cur_max_version = tablet()->max_version_unlocked();
// tablet is under alter process. The delete bitmap will be calculated
after conversion.
diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h
index 5563ece1b38..e54faee3435 100644
--- a/be/src/olap/rowset_builder.h
+++ b/be/src/olap/rowset_builder.h
@@ -82,6 +82,8 @@ public:
return _partial_update_info;
}
+ Status init_mow_context(std::shared_ptr<MowContext>& mow_context);
+
protected:
void _build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam*
table_schema_param,
@@ -133,8 +135,6 @@ private:
void _garbage_collection();
- Status init_mow_context(std::shared_ptr<MowContext>& mow_context);
-
// Cast `BaseTablet` to `Tablet`
Tablet* tablet();
TabletSharedPtr tablet_sptr();
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index efe63591e44..7f72af7f86c 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -146,8 +146,6 @@ using io::FileSystemSPtr;
namespace {
-bvar::LatencyRecorder g_tablet_update_delete_bitmap_latency("doris_pk",
"update_delete_bitmap");
-
bvar::Adder<uint64_t> exceed_version_limit_counter;
bvar::Window<bvar::Adder<uint64_t>> exceed_version_limit_counter_minute(
&exceed_version_limit_counter, 60);
@@ -169,14 +167,6 @@ void set_last_failure_time(Tablet* tablet, const
Compaction& compaction, int64_t
}
};
-// load segment may do io so it should out lock
-Status _load_rowset_segments(const RowsetSharedPtr& rowset,
- std::vector<segment_v2::SegmentSharedPtr>*
segments) {
- auto* beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
- RETURN_IF_ERROR(beta_rowset->load_segments(segments));
- return Status::OK();
-}
-
} // namespace
struct WriteCooldownMetaExecutors {
@@ -1757,7 +1747,8 @@ Result<std::unique_ptr<RowsetWriter>>
Tablet::create_rowset_writer(RowsetWriterC
// create a rowset writer with rowset_id and seg_id
// after writer, merge this transient rowset with original rowset
Result<std::unique_ptr<RowsetWriter>> Tablet::create_transient_rowset_writer(
- const Rowset& rowset, std::shared_ptr<PartialUpdateInfo>
partial_update_info) {
+ const Rowset& rowset, std::shared_ptr<PartialUpdateInfo>
partial_update_info,
+ int64_t txn_expiration) {
RowsetWriterContext context;
context.rowset_state = PREPARED;
context.segments_overlap = OVERLAPPING;
@@ -2322,7 +2313,7 @@ Status Tablet::update_delete_bitmap_without_lock(const
TabletSharedPtr& self,
});
int64_t cur_version = rowset->end_version();
std::vector<segment_v2::SegmentSharedPtr> segments;
- RETURN_IF_ERROR(_load_rowset_segments(rowset, &segments));
+
RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
// If this rowset does not have a segment, there is no need for an update.
if (segments.empty()) {
@@ -2365,117 +2356,22 @@ Status Tablet::update_delete_bitmap_without_lock(const
TabletSharedPtr& self,
return Status::OK();
}
-Status Tablet::update_delete_bitmap(const TabletSharedPtr& self, const
TabletTxnInfo* txn_info,
- int64_t txn_id) {
- SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency);
- RowsetIdUnorderedSet cur_rowset_ids;
- RowsetIdUnorderedSet rowset_ids_to_add;
- RowsetIdUnorderedSet rowset_ids_to_del;
+CalcDeleteBitmapExecutor* Tablet::calc_delete_bitmap_executor() {
+ return _engine.calc_delete_bitmap_executor();
+}
+
+Status Tablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t
txn_id,
+ DeleteBitmapPtr delete_bitmap, RowsetWriter*
rowset_writer,
+ const RowsetIdUnorderedSet& cur_rowset_ids) {
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();
- auto rowset_writer =
- DORIS_TRY(self->create_transient_rowset_writer(*rowset,
txn_info->partial_update_info));
-
- DeleteBitmapPtr delete_bitmap = txn_info->delete_bitmap;
- // Partial update might generate new segments when there is conflicts
while publish, and mark
- // the same key in original segments as delete.
- // When the new segment flush fails or the rowset build fails, the
deletion marker for the
- // duplicate key of the original segment should not remain in
`txn_info->delete_bitmap`,
- // so we need to make a copy of `txn_info->delete_bitmap` and make changes
on it.
- if (txn_info->partial_update_info &&
txn_info->partial_update_info->is_partial_update) {
- delete_bitmap =
std::make_shared<DeleteBitmap>(*(txn_info->delete_bitmap));
- }
-
- OlapStopWatch watch;
- std::vector<segment_v2::SegmentSharedPtr> segments;
- RETURN_IF_ERROR(_load_rowset_segments(rowset, &segments));
- auto t1 = watch.get_elapse_time_us();
-
- {
- std::shared_lock meta_rlock(self->_meta_lock);
- // tablet is under alter process. The delete bitmap will be calculated
after conversion.
- if (self->tablet_state() == TABLET_NOTREADY) {
- LOG(INFO) << "tablet is under alter process, update delete bitmap
later, tablet_id="
- << self->tablet_id();
- return Status::OK();
- }
- RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1,
&cur_rowset_ids));
- }
- auto t2 = watch.get_elapse_time_us();
-
- _rowset_ids_difference(cur_rowset_ids, txn_info->rowset_ids,
&rowset_ids_to_add,
- &rowset_ids_to_del);
- for (const auto& to_del : rowset_ids_to_del) {
- delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
- }
-
- std::vector<RowsetSharedPtr> specified_rowsets;
- {
- std::shared_lock meta_rlock(self->_meta_lock);
- specified_rowsets = self->get_rowset_by_ids(&rowset_ids_to_add);
- }
- auto t3 = watch.get_elapse_time_us();
-
- auto token = self->_engine.calc_delete_bitmap_executor()->create_token();
- RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments,
specified_rowsets, delete_bitmap,
- cur_version - 1, token.get(),
rowset_writer.get()));
- RETURN_IF_ERROR(token->wait());
-
- std::stringstream ss;
- if (watch.get_elapse_time_us() < 1 * 1000 * 1000) {
- ss << "cost: " << watch.get_elapse_time_us() - t3 << "(us)";
- } else {
- ss << "cost(us): (load segments: " << t1 << ", get all rsid: " << t2 -
t1
- << ", get rowsets: " << t3 - t2
- << ", calc delete bitmap: " << watch.get_elapse_time_us() - t3 <<
")";
- }
-
- size_t total_rows = std::accumulate(
- segments.begin(), segments.end(), 0,
- [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
- LOG(INFO) << "[Publish] construct delete bitmap tablet: " <<
self->tablet_id()
- << ", rowset_ids to add: " << rowset_ids_to_add.size()
- << ", rowset_ids to del: " << rowset_ids_to_del.size()
- << ", cur version: " << cur_version << ", transaction_id: " <<
txn_id << ","
- << ss.str() << " , total rows: " << total_rows;
-
- if (config::enable_merge_on_write_correctness_check && rowset->num_rows()
!= 0) {
- // only do correctness check if the rowset has at least one row written
- // check if all the rowset has ROWSET_SENTINEL_MARK
- auto st = self->check_delete_bitmap_correctness(delete_bitmap,
cur_version - 1, -1,
- cur_rowset_ids,
&specified_rowsets);
- if (!st.ok()) {
- LOG(WARNING) << fmt::format("delete bitmap correctness check
failed in publish phase!");
- }
- self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap);
- }
-
- if (txn_info->partial_update_info &&
txn_info->partial_update_info->is_partial_update) {
-
DBUG_EXECUTE_IF("Tablet.update_delete_bitmap.partial_update_write_rowset_fail",
{
- if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
-
LOG_WARNING("Tablet.update_delete_bitmap.partial_update_write_rowset random
failed")
- .tag("txn_id", txn_id);
- return Status::InternalError(
- "debug update_delete_bitmap partial update write
rowset random failed");
- }
- });
- // build rowset writer and merge transient rowset
- RETURN_IF_ERROR(rowset_writer->flush());
- RowsetSharedPtr transient_rowset;
- RETURN_IF_ERROR(rowset_writer->build(transient_rowset));
- rowset->merge_rowset_meta(transient_rowset->rowset_meta());
-
- // erase segment cache cause we will add a segment to rowset
- SegmentLoader::instance()->erase_segments(rowset->rowset_id(),
rowset->num_segments());
- }
-
// update version without write lock, compaction and publish_txn
// will update delete bitmap, handle compaction with _rowset_update_lock
// and publish_txn runs sequential so no need to lock here
for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
- self->_tablet_meta->delete_bitmap().merge({std::get<0>(key),
std::get<1>(key), cur_version},
- bitmap);
+ _tablet_meta->delete_bitmap().merge({std::get<0>(key),
std::get<1>(key), cur_version},
+ bitmap);
}
return Status::OK();
@@ -2536,7 +2432,8 @@ Status Tablet::check_rowid_conversion(
return Status::OK();
}
std::vector<segment_v2::SegmentSharedPtr> dst_segments;
- RETURN_IF_ERROR(_load_rowset_segments(dst_rowset, &dst_segments));
+ RETURN_IF_ERROR(
+
std::dynamic_pointer_cast<BetaRowset>(dst_rowset)->load_segments(&dst_segments));
std::unordered_map<RowsetId, std::vector<segment_v2::SegmentSharedPtr>,
HashOfRowsetId>
input_rowsets_segment;
@@ -2545,7 +2442,8 @@ Status Tablet::check_rowid_conversion(
std::vector<segment_v2::SegmentSharedPtr>& segments =
input_rowsets_segment[src_rowset->rowset_id()];
if (segments.empty()) {
- RETURN_IF_ERROR(_load_rowset_segments(src_rowset, &segments));
+ RETURN_IF_ERROR(
+
std::dynamic_pointer_cast<BetaRowset>(src_rowset)->load_segments(&segments));
}
for (auto& [src, dst] : locations) {
std::string src_key;
@@ -2760,81 +2658,4 @@ Status Tablet::ingest_binlog_metas(RowsetBinlogMetasPB*
metas_pb) {
return RowsetMetaManager::ingest_binlog_metas(_data_dir->get_meta(),
tablet_uid(), metas_pb);
}
-void Tablet::_remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr
delete_bitmap) {
- for (auto it = delete_bitmap->delete_bitmap.begin(), end =
delete_bitmap->delete_bitmap.end();
- it != end;) {
- if (std::get<1>(it->first) == DeleteBitmap::INVALID_SEGMENT_ID) {
- it = delete_bitmap->delete_bitmap.erase(it);
- } else {
- ++it;
- }
- }
-}
-
-Status Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap,
int64_t max_version,
- int64_t txn_id,
- const RowsetIdUnorderedSet&
rowset_ids,
- std::vector<RowsetSharedPtr>*
rowsets) {
- RowsetIdUnorderedSet missing_ids;
- for (const auto& rowsetid : rowset_ids) {
- if (!delete_bitmap->delete_bitmap.contains({rowsetid,
DeleteBitmap::INVALID_SEGMENT_ID,
-
DeleteBitmap::TEMP_VERSION_COMMON})) {
- missing_ids.insert(rowsetid);
- }
- }
-
- if (!missing_ids.empty()) {
- LOG(WARNING) << "[txn_id:" << txn_id << "][tablet_id:" << tablet_id()
- << "][max_version: " << max_version
- << "] check delete bitmap correctness failed!";
- rapidjson::Document root;
- root.SetObject();
- rapidjson::Document required_rowsets_arr;
- required_rowsets_arr.SetArray();
- rapidjson::Document missing_rowsets_arr;
- missing_rowsets_arr.SetArray();
-
- if (rowsets != nullptr) {
- for (const auto& rowset : *rowsets) {
- rapidjson::Value value;
- std::string version_str = rowset->get_rowset_info_str();
- value.SetString(version_str.c_str(), version_str.length(),
- required_rowsets_arr.GetAllocator());
- required_rowsets_arr.PushBack(value,
required_rowsets_arr.GetAllocator());
- }
- } else {
- std::vector<RowsetSharedPtr> rowsets;
- {
- std::shared_lock meta_rlock(_meta_lock);
- rowsets = get_rowset_by_ids(&rowset_ids);
- }
- for (const auto& rowset : rowsets) {
- rapidjson::Value value;
- std::string version_str = rowset->get_rowset_info_str();
- value.SetString(version_str.c_str(), version_str.length(),
- required_rowsets_arr.GetAllocator());
- required_rowsets_arr.PushBack(value,
required_rowsets_arr.GetAllocator());
- }
- }
- for (const auto& missing_rowset_id : missing_ids) {
- rapidjson::Value miss_value;
- std::string rowset_id_str = missing_rowset_id.to_string();
- miss_value.SetString(rowset_id_str.c_str(), rowset_id_str.length(),
- missing_rowsets_arr.GetAllocator());
- missing_rowsets_arr.PushBack(miss_value,
missing_rowsets_arr.GetAllocator());
- }
-
- root.AddMember("required_rowsets", required_rowsets_arr,
root.GetAllocator());
- root.AddMember("missing_rowsets", missing_rowsets_arr,
root.GetAllocator());
- rapidjson::StringBuffer strbuf;
- rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
- root.Accept(writer);
- std::string rowset_status_string = std::string(strbuf.GetString());
- LOG_EVERY_SECOND(WARNING) << rowset_status_string;
- // let it crash if correctness check failed in Debug mode
- DCHECK(false) << "delete bitmap correctness check failed in publish
phase!";
- return Status::InternalError("check delete bitmap failed!");
- }
- return Status::OK();
-}
} // namespace doris
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index aea9dade310..873f22bff74 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -70,7 +70,6 @@ class TupleDescriptor;
class CalcDeleteBitmapToken;
enum CompressKind : int;
class RowsetBinlogMetasPB;
-struct TabletTxnInfo;
namespace io {
class RemoteFileSystem;
@@ -306,7 +305,8 @@ public:
bool vertical)
override;
Result<std::unique_ptr<RowsetWriter>> create_transient_rowset_writer(
- const Rowset& rowset, std::shared_ptr<PartialUpdateInfo>
partial_update_info);
+ const Rowset& rowset, std::shared_ptr<PartialUpdateInfo>
partial_update_info,
+ int64_t txn_expiration = 0) override;
Result<std::unique_ptr<RowsetWriter>> create_transient_rowset_writer(
RowsetWriterContext& context, const RowsetId& rowset_id);
@@ -370,12 +370,14 @@ public:
// end cooldown functions
////////////////////////////////////////////////////////////////////////////
- static Status update_delete_bitmap(const TabletSharedPtr& self, const
TabletTxnInfo* txn_info,
- int64_t txn_id);
-
static Status update_delete_bitmap_without_lock(const TabletSharedPtr&
self,
const RowsetSharedPtr&
rowset);
+ CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() override;
+ Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
+ DeleteBitmapPtr delete_bitmap, RowsetWriter*
rowset_writer,
+ const RowsetIdUnorderedSet& cur_rowset_ids)
override;
+
void calc_compaction_output_rowset_delete_bitmap(
const std::vector<RowsetSharedPtr>& input_rowsets,
const RowIdConversion& rowid_conversion, uint64_t start_version,
uint64_t end_version,
@@ -448,9 +450,6 @@ public:
void set_binlog_config(BinlogConfig binlog_config);
- Status check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap,
int64_t max_version,
- int64_t txn_id, const
RowsetIdUnorderedSet& rowset_ids,
- std::vector<RowsetSharedPtr>*
rowsets = nullptr);
void set_alter_failed(bool alter_failed) { _alter_failed = alter_failed; }
bool is_alter_failed() { return _alter_failed; }
@@ -488,8 +487,6 @@ private:
// end cooldown functions
////////////////////////////////////////////////////////////////////////////
- void _remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr
delete_bitmap);
-
public:
static const int64_t K_INVALID_CUMULATIVE_POINT = -1;
diff --git a/be/src/olap/tablet_fwd.h b/be/src/olap/tablet_fwd.h
index c54164c57cb..3583a286643 100644
--- a/be/src/olap/tablet_fwd.h
+++ b/be/src/olap/tablet_fwd.h
@@ -26,6 +26,8 @@ class Tablet;
class TabletSchema;
class TabletMeta;
class DeleteBitmap;
+class CalcDeleteBitmapExecutor;
+struct TabletTxnInfo;
using BaseTabletSPtr = std::shared_ptr<BaseTablet>;
using TabletSharedPtr = std::shared_ptr<Tablet>;
diff --git a/be/src/runtime/memory/cache_policy.h
b/be/src/runtime/memory/cache_policy.h
index 6de3de600d8..538b30099f4 100644
--- a/be/src/runtime/memory/cache_policy.h
+++ b/be/src/runtime/memory/cache_policy.h
@@ -44,6 +44,7 @@ public:
TABLET_SCHEMA_CACHE = 14,
CREATE_TABLET_RR_IDX_CACHE = 15,
CLOUD_TABLET_CACHE = 16,
+ CLOUD_TXN_DELETE_BITMAP_CACHE = 17,
};
static std::string type_string(CacheType type) {
@@ -82,6 +83,8 @@ public:
return "CreateTabletRRIdxCache";
case CacheType::CLOUD_TABLET_CACHE:
return "CloudTabletCache";
+ case CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE:
+ return "CloudTxnDeleteBitmapCache";
default:
LOG(FATAL) << "not match type of cache policy :" <<
static_cast<int>(type);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index d201e065804..00ca450ff2e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -280,11 +280,6 @@ public class CreateTableStmt extends DdlStmt {
@Override
public void analyze(Analyzer analyzer) throws UserException {
- if (Config.isCloudMode() && properties != null
- &&
properties.containsKey(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE)) {
- // FIXME: MOW is not supported in cloud mode yet.
- properties.put(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE,
"false");
- }
if (Strings.isNullOrEmpty(engineName) ||
engineName.equalsIgnoreCase(DEFAULT_ENGINE_NAME)) {
this.properties = maybeRewriteByAutoBucket(distributionDesc,
properties);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
index 825e55ee09e..0c73a19c7d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
@@ -57,6 +57,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/**
@@ -82,7 +83,10 @@ public abstract class Table extends MetaObject implements
Writable, TableIf {
@SerializedName(value = "createTime")
protected long createTime;
protected QueryableReentrantReadWriteLock rwLock;
-
+ // Used for queuing commit transaction tasks to avoid fdb transaction
conflicts,
+ // especially to reduce conflicts when obtaining delete bitmap update
locks for
+ // MoW table
+ protected ReentrantLock commitLock;
/*
* fullSchema and nameToColumn should contains all columns, both visible
and shadow.
* eg. for OlapTable, when doing schema change, there will be some shadow
columns which are not visible
@@ -131,6 +135,7 @@ public abstract class Table extends MetaObject implements
Writable, TableIf {
if (Config.check_table_lock_leaky) {
this.readLockThreads = Maps.newConcurrentMap();
}
+ this.commitLock = new ReentrantLock(true);
}
public Table(long id, String tableName, TableType type, List<Column>
fullSchema) {
@@ -155,6 +160,7 @@ public abstract class Table extends MetaObject implements
Writable, TableIf {
if (Config.check_table_lock_leaky) {
this.readLockThreads = Maps.newConcurrentMap();
}
+ this.commitLock = new ReentrantLock(true);
}
public void markDropped() {
@@ -297,6 +303,28 @@ public abstract class Table extends MetaObject implements
Writable, TableIf {
return false;
}
+ public void commitLock() {
+ this.commitLock.lock();
+ }
+
+ public boolean tryCommitLock(long timeout, TimeUnit unit) {
+ try {
+ boolean res = this.commitLock.tryLock(timeout, unit);
+ if (!res && unit.toSeconds(timeout) >= 1) {
+ LOG.warn("Failed to try table {}'s cloud commit lock. timeout
{} {}. Current owner: {}",
+ name, timeout, unit.name(), rwLock.getOwner());
+ }
+ return res;
+ } catch (InterruptedException e) {
+ LOG.warn("failed to try cloud commit lock at table[" + name + "]",
e);
+ return false;
+ }
+ }
+
+ public void commitUnlock() {
+ this.commitLock.unlock();
+ }
+
public boolean isTypeRead() {
return isTypeRead;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 56c3aed2833..6be3875fef6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -20,7 +20,11 @@ package org.apache.doris.cloud.transaction;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.cloud.proto.Cloud.AbortTxnRequest;
import org.apache.doris.cloud.proto.Cloud.AbortTxnResponse;
import org.apache.doris.cloud.proto.Cloud.BeginTxnRequest;
@@ -33,6 +37,8 @@ import org.apache.doris.cloud.proto.Cloud.CommitTxnRequest;
import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse;
import org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnRequest;
import org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnResponse;
+import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockRequest;
+import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockResponse;
import org.apache.doris.cloud.proto.Cloud.GetTxnRequest;
import org.apache.doris.cloud.proto.Cloud.GetTxnResponse;
import org.apache.doris.cloud.proto.Cloud.LoadJobSourceTypePB;
@@ -46,12 +52,15 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.InternalDatabaseUtil;
+import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment;
import org.apache.doris.metric.MetricRepo;
@@ -60,7 +69,14 @@ import
org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rpc.RpcException;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.CalcDeleteBitmapTask;
+import org.apache.doris.thrift.TCalcDeleteBitmapPartitionInfo;
import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TTaskType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TWaitingTxnStatusRequest;
import org.apache.doris.thrift.TWaitingTxnStatusResult;
@@ -77,8 +93,14 @@ import org.apache.doris.transaction.TxnCommitAttachment;
import org.apache.doris.transaction.TxnStateCallbackFactory;
import org.apache.doris.transaction.TxnStateChangeCallback;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -88,11 +110,17 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface {
private static final Logger LOG =
LogManager.getLogger(CloudGlobalTransactionMgr.class);
private static final String NOT_SUPPORTED_MSG = "Not supported in cloud
mode";
+ private static final int DELETE_BITMAP_LOCK_EXPIRATION_SECONDS = 10;
+ private static final int CALCULATE_DELETE_BITMAP_TASK_TIMEOUT_SECONDS = 15;
private TxnStateCallbackFactory callbackFactory;
@@ -295,6 +323,10 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
"disable_load_job is set to true, all load jobs are not
allowed");
}
+ List<OlapTable> mowTableList = getMowTableList(tableList);
+ if (tabletCommitInfos != null && !tabletCommitInfos.isEmpty() &&
!mowTableList.isEmpty()) {
+ calcDeleteBitmapForMow(dbId, mowTableList, transactionId,
tabletCommitInfos);
+ }
CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder();
builder.setDbId(dbId)
@@ -302,6 +334,13 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
.setIs2Pc(is2PC)
.setCloudUniqueId(Config.cloud_unique_id);
+ // if tablet commit info is empty, no need to pass mowTableList to
meta service.
+ if (tabletCommitInfos != null && !tabletCommitInfos.isEmpty()) {
+ for (OlapTable olapTable : mowTableList) {
+ builder.addMowTableIds(olapTable.getId());
+ }
+ }
+
if (txnCommitAttachment != null) {
if (txnCommitAttachment instanceof LoadJobFinalOperation) {
LoadJobFinalOperation loadJobFinalOperation =
(LoadJobFinalOperation) txnCommitAttachment;
@@ -350,6 +389,11 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
&& commitTxnResponse.getStatus().getCode() !=
MetaServiceCode.TXN_ALREADY_VISIBLE) {
LOG.warn("commitTxn failed, transactionId:{}, retryTime:{},
commitTxnResponse:{}",
transactionId, retryTime, commitTxnResponse);
+ if (commitTxnResponse.getStatus().getCode() ==
MetaServiceCode.LOCK_EXPIRED) {
+ // DELETE_BITMAP_LOCK_ERR will be retried on be
+ throw new
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
+ "delete bitmap update lock expired, transactionId:" +
transactionId);
+ }
StringBuilder internalMsgBuilder =
new StringBuilder("commitTxn failed, transactionId:");
internalMsgBuilder.append(transactionId);
@@ -372,6 +416,228 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
}
+ private List<OlapTable> getMowTableList(List<Table> tableList) {
+ List<OlapTable> mowTableList = new ArrayList<>();
+ for (Table table : tableList) {
+ if ((table instanceof OlapTable)) {
+ OlapTable olapTable = (OlapTable) table;
+ if (olapTable.getEnableUniqueKeyMergeOnWrite()) {
+ mowTableList.add(olapTable);
+ }
+ }
+ }
+ return mowTableList;
+ }
+
+ private void calcDeleteBitmapForMow(long dbId, List<OlapTable> tableList,
long transactionId,
+ List<TabletCommitInfo> tabletCommitInfos)
+ throws UserException {
+ Map<Long, Map<Long, List<Long>>> backendToPartitionTablets =
Maps.newHashMap();
+ Map<Long, Partition> partitions = Maps.newHashMap();
+ Map<Long, Set<Long>> tableToPartitions = Maps.newHashMap();
+ getPartitionInfo(tableList, tabletCommitInfos, tableToPartitions,
partitions, backendToPartitionTablets);
+ if (backendToPartitionTablets.isEmpty()) {
+ throw new UserException("The partition info is empty, table may be
dropped, txnid=" + transactionId);
+ }
+
+ getDeleteBitmapUpdateLock(tableToPartitions, transactionId);
+ Map<Long, Long> partitionVersions = getPartitionVersions(partitions);
+
+ Map<Long, List<TCalcDeleteBitmapPartitionInfo>>
backendToPartitionInfos = getCalcDeleteBitmapInfo(
+ backendToPartitionTablets, partitionVersions);
+ sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos);
+ }
+
+ private void getPartitionInfo(List<OlapTable> tableList,
+ List<TabletCommitInfo> tabletCommitInfos,
+ Map<Long, Set<Long>> tableToParttions,
+ Map<Long, Partition> partitions,
+ Map<Long, Map<Long, List<Long>>> backendToPartitionTablets) {
+ Map<Long, OlapTable> tableMap = Maps.newHashMap();
+ for (OlapTable olapTable : tableList) {
+ tableMap.put(olapTable.getId(), olapTable);
+ }
+
+ List<Long> tabletIds = tabletCommitInfos.stream()
+
.map(TabletCommitInfo::getTabletId).collect(Collectors.toList());
+ TabletInvertedIndex tabletInvertedIndex =
Env.getCurrentEnv().getTabletInvertedIndex();
+ List<TabletMeta> tabletMetaList =
tabletInvertedIndex.getTabletMetaList(tabletIds);
+ for (int i = 0; i < tabletMetaList.size(); i++) {
+ TabletMeta tabletMeta = tabletMetaList.get(i);
+ long tableId = tabletMeta.getTableId();
+ if (!tableMap.containsKey(tableId)) {
+ continue;
+ }
+
+ long partitionId = tabletMeta.getPartitionId();
+ long backendId = tabletCommitInfos.get(i).getBackendId();
+
+ if (!tableToParttions.containsKey(tableId)) {
+ tableToParttions.put(tableId, Sets.newHashSet());
+ }
+ tableToParttions.get(tableId).add(partitionId);
+
+ if (!backendToPartitionTablets.containsKey(backendId)) {
+ backendToPartitionTablets.put(backendId, Maps.newHashMap());
+ }
+ Map<Long, List<Long>> partitionToTablets =
backendToPartitionTablets.get(backendId);
+ if (!partitionToTablets.containsKey(partitionId)) {
+ partitionToTablets.put(partitionId, Lists.newArrayList());
+ }
+ partitionToTablets.get(partitionId).add(tabletIds.get(i));
+ partitions.putIfAbsent(partitionId,
tableMap.get(tableId).getPartition(partitionId));
+ }
+ }
+
+ private Map<Long, Long> getPartitionVersions(Map<Long, Partition>
partitionMap) {
+ Map<Long, Long> partitionToVersions = Maps.newHashMap();
+ partitionMap.forEach((key, value) -> {
+ long visibleVersion = value.getVisibleVersion();
+ long newVersion = visibleVersion <= 0 ? 2 : visibleVersion + 1;
+ partitionToVersions.put(key, newVersion);
+ });
+ return partitionToVersions;
+ }
+
+ private Map<Long, List<TCalcDeleteBitmapPartitionInfo>>
getCalcDeleteBitmapInfo(
+ Map<Long, Map<Long, List<Long>>> backendToPartitionTablets,
Map<Long, Long> partitionVersions) {
+ Map<Long, List<TCalcDeleteBitmapPartitionInfo>>
backendToPartitionInfos = Maps.newHashMap();
+ for (Map.Entry<Long, Map<Long, List<Long>>> entry :
backendToPartitionTablets.entrySet()) {
+ List<TCalcDeleteBitmapPartitionInfo> partitionInfos =
Lists.newArrayList();
+ for (Map.Entry<Long, List<Long>> partitionToTables :
entry.getValue().entrySet()) {
+ Long partitionId = partitionToTables.getKey();
+ TCalcDeleteBitmapPartitionInfo partitionInfo = new
TCalcDeleteBitmapPartitionInfo(partitionId,
+ partitionVersions.get(partitionId),
+ partitionToTables.getValue());
+ partitionInfos.add(partitionInfo);
+ }
+ backendToPartitionInfos.put(entry.getKey(), partitionInfos);
+ }
+ return backendToPartitionInfos;
+ }
+
+ private void getDeleteBitmapUpdateLock(Map<Long, Set<Long>>
tableToParttions, long transactionId)
+ throws UserException {
+ StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ for (Map.Entry<Long, Set<Long>> entry : tableToParttions.entrySet()) {
+ GetDeleteBitmapUpdateLockRequest.Builder builder =
GetDeleteBitmapUpdateLockRequest.newBuilder();
+ builder.setTableId(entry.getKey())
+ .setLockId(transactionId)
+ .setInitiator(-1)
+ .setExpiration(DELETE_BITMAP_LOCK_EXPIRATION_SECONDS);
+ final GetDeleteBitmapUpdateLockRequest request = builder.build();
+ GetDeleteBitmapUpdateLockResponse response = null;
+
+ int retryTime = 0;
+ while (retryTime++ < Config.meta_service_rpc_retry_times) {
+ try {
+ response =
MetaServiceProxy.getInstance().getDeleteBitmapUpdateLock(request);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get delete bitmap lock, transactionId={},
Request: {}, Response: {}",
+ transactionId, request, response);
+ }
+ if (response.getStatus().getCode() !=
MetaServiceCode.LOCK_CONFLICT
+ && response.getStatus().getCode() !=
MetaServiceCode.KV_TXN_CONFLICT) {
+ break;
+ }
+ } catch (Exception e) {
+ LOG.warn("ignore get delete bitmap lock exception,
transactionId={}, retryTime={}",
+ transactionId, retryTime, e);
+ }
+ // sleep random millis [20, 200] ms, avoid txn conflict
+ int randomMillis = 20 + (int) (Math.random() * (200 - 20));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("randomMillis:{}", randomMillis);
+ }
+ try {
+ Thread.sleep(randomMillis);
+ } catch (InterruptedException e) {
+ LOG.info("InterruptedException: ", e);
+ }
+ }
+ Preconditions.checkNotNull(response);
+ Preconditions.checkNotNull(response.getStatus());
+ if (response.getStatus().getCode() != MetaServiceCode.OK) {
+ LOG.warn("get delete bitmap lock failed, transactionId={}, for
{} times, response:{}",
+ transactionId, retryTime, response);
+ if (response.getStatus().getCode() ==
MetaServiceCode.LOCK_CONFLICT
+ || response.getStatus().getCode() ==
MetaServiceCode.KV_TXN_CONFLICT) {
+ // DELETE_BITMAP_LOCK_ERR will be retried on be
+ throw new
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
+ "Failed to get delete bitmap lock due to
confilct");
+ }
+ throw new UserException("Failed to get delete bitmap lock,
code: " + response.getStatus().getCode());
+ }
+ }
+ stopWatch.stop();
+ LOG.info("get delete bitmap lock successfully. txns: {}. time cost: {}
ms.",
+ transactionId, stopWatch.getTime());
+ }
+
+ private void sendCalcDeleteBitmaptask(long dbId, long transactionId,
+ Map<Long, List<TCalcDeleteBitmapPartitionInfo>>
backendToPartitionInfos)
+ throws UserException {
+ if (backendToPartitionInfos.isEmpty()) {
+ return;
+ }
+ StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ int totalTaskNum = backendToPartitionInfos.size();
+ MarkedCountDownLatch<Long, Long> countDownLatch = new
MarkedCountDownLatch<Long, Long>(
+ totalTaskNum);
+ AgentBatchTask batchTask = new AgentBatchTask();
+ for (Map.Entry<Long, List<TCalcDeleteBitmapPartitionInfo>> entry :
backendToPartitionInfos.entrySet()) {
+ CalcDeleteBitmapTask task = new
CalcDeleteBitmapTask(entry.getKey(),
+ transactionId,
+ dbId,
+ entry.getValue(),
+ countDownLatch);
+ countDownLatch.addMark(entry.getKey(), transactionId);
+ // add to AgentTaskQueue for handling finish report.
+ // not check return value, because the add will success
+ AgentTaskQueue.addTask(task);
+ batchTask.addTask(task);
+ }
+ AgentTaskExecutor.submit(batchTask);
+
+ boolean ok;
+ try {
+ ok =
countDownLatch.await(CALCULATE_DELETE_BITMAP_TASK_TIMEOUT_SECONDS,
TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("InterruptedException: ", e);
+ ok = false;
+ }
+
+ if (!ok || !countDownLatch.getStatus().ok()) {
+ String errMsg = "Failed to calculate delete bitmap.";
+ // clear tasks
+ AgentTaskQueue.removeBatchTask(batchTask,
TTaskType.CALCULATE_DELETE_BITMAP);
+
+ if (!countDownLatch.getStatus().ok()) {
+ errMsg += countDownLatch.getStatus().getErrorMsg();
+ if (countDownLatch.getStatus().getErrorCode() !=
TStatusCode.DELETE_BITMAP_LOCK_ERROR) {
+ throw new UserException(errMsg);
+ }
+ } else {
+ errMsg += " Timeout.";
+ List<Entry<Long, Long>> unfinishedMarks =
countDownLatch.getLeftMarks();
+ // only show at most 3 results
+ List<Entry<Long, Long>> subList = unfinishedMarks.subList(0,
+ Math.min(unfinishedMarks.size(), 3));
+ if (!subList.isEmpty()) {
+ errMsg += " Unfinished mark: " + Joiner.on(",
").join(subList);
+ }
+ }
+ LOG.warn(errMsg);
+ // DELETE_BITMAP_LOCK_ERR will be retried on be
+ throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
errMsg);
+ }
+ stopWatch.stop();
+ LOG.info("calc delete bitmap task successfully. txns: {}. time cost:
{} ms.",
+ transactionId, stopWatch.getTime());
+ }
+
@Override
public boolean commitAndPublishTransaction(DatabaseIf db, List<Table>
tableList, long transactionId,
List<TabletCommitInfo>
tabletCommitInfos, long timeoutMillis)
@@ -383,7 +649,17 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
public boolean commitAndPublishTransaction(DatabaseIf db, List<Table>
tableList, long transactionId,
List<TabletCommitInfo>
tabletCommitInfos, long timeoutMillis,
TxnCommitAttachment
txnCommitAttachment) throws UserException {
- commitTransaction(db.getId(), tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
+ if (!MetaLockUtils.tryCommitLockTables(tableList, timeoutMillis,
TimeUnit.MILLISECONDS)) {
+ // DELETE_BITMAP_LOCK_ERR will be retried on be
+ throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
+ "get table cloud commit lock timeout, tableList=("
+ + StringUtils.join(tableList, ",") + ")");
+ }
+ try {
+ commitTransaction(db.getId(), tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
+ } finally {
+ MetaLockUtils.commitUnlockTables(tableList);
+ }
return true;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
index 2bbd5c58efa..944f8e04860 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
@@ -34,7 +34,10 @@ public enum InternalErrorCode {
MANUAL_STOP_ERR(101),
TOO_MANY_FAILURE_ROWS_ERR(102),
CREATE_TASKS_ERR(103),
- TASKS_ABORT_ERR(104);
+ TASKS_ABORT_ERR(104),
+
+ // for MoW table
+ DELETE_BITMAP_LOCK_ERR(301);
private long errCode;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
index ece62cbc10d..084c6f25f79 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
@@ -18,6 +18,7 @@
package org.apache.doris.common.util;
import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.MetaNotFoundException;
@@ -112,4 +113,27 @@ public class MetaLockUtils {
}
}
+ public static void commitLockTables(List<Table> tableList) {
+ for (Table table : tableList) {
+ table.commitLock();
+ }
+ }
+
+ public static void commitUnlockTables(List<Table> tableList) {
+ for (int i = tableList.size() - 1; i >= 0; i--) {
+ tableList.get(i).commitUnlock();
+ }
+ }
+
+ public static boolean tryCommitLockTables(List<Table> tableList, long
timeout, TimeUnit unit) {
+ for (int i = 0; i < tableList.size(); i++) {
+ if (!tableList.get(i).tryCommitLock(timeout, unit)) {
+ for (int j = i - 1; j >= 0; j--) {
+ tableList.get(j).commitUnlock();
+ }
+ return false;
+ }
+ }
+ return true;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 5625e9626de..9e07574e57a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -1439,7 +1439,7 @@ public class PropertyAnalyzer {
// the user doesn't specify the property in
`CreateTableStmt`/`CreateTableInfo`
public static Map<String, String>
enableUniqueKeyMergeOnWriteIfNotExists(Map<String, String> properties) {
if (Config.isCloudMode()) {
- // FIXME: MOW is not supported in cloud mode yet.
+ // the default value of enable_unique_key_merge_on_write is false
for cloud mode yet.
return properties;
}
if (properties != null &&
properties.get(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE) == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 485463d8daf..12f1600cd3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -37,6 +37,7 @@ import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterInvertedIndexTask;
import org.apache.doris.task.AlterReplicaTask;
+import org.apache.doris.task.CalcDeleteBitmapTask;
import org.apache.doris.task.CheckConsistencyTask;
import org.apache.doris.task.ClearAlterTask;
import org.apache.doris.task.CloneTask;
@@ -137,7 +138,8 @@ public class MasterImpl {
&& taskType != TTaskType.DOWNLOAD && taskType !=
TTaskType.MOVE
&& taskType != TTaskType.CLONE && taskType !=
TTaskType.PUBLISH_VERSION
&& taskType != TTaskType.CREATE && taskType !=
TTaskType.UPDATE_TABLET_META_INFO
- && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE) {
+ && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE
+ && taskType != TTaskType.CALCULATE_DELETE_BITMAP) {
return result;
}
}
@@ -203,6 +205,9 @@ public class MasterImpl {
case PUSH_COOLDOWN_CONF:
finishPushCooldownConfTask(task);
break;
+ case CALCULATE_DELETE_BITMAP:
+ finishCalcDeleteBitmap(task, request);
+ break;
default:
break;
}
@@ -628,4 +633,28 @@ public class MasterImpl {
cooldownTask.setFinished(true);
AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.PUSH_COOLDOWN_CONF, task.getSignature());
}
+
+ private void finishCalcDeleteBitmap(AgentTask task, TFinishTaskRequest
request) {
+ // if we get here, this task will be removed from AgentTaskQueue for
certain.
+ // because in this function, the only problem that cause failure is
meta missing.
+ // and if meta is missing, we no longer need to resend this task
+ try {
+ CalcDeleteBitmapTask calcDeleteBitmapTask = (CalcDeleteBitmapTask)
task;
+ if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
+
calcDeleteBitmapTask.countDownToZero(request.getTaskStatus().getStatusCode(),
+ "backend: " + task.getBackendId() + ",
error_tablet_size: "
+ + request.getErrorTabletIdsSize() + ",
err_msg: "
+ +
request.getTaskStatus().getErrorMsgs().toString());
+ } else {
+ calcDeleteBitmapTask.countDownLatch(task.getBackendId(),
calcDeleteBitmapTask.getTransactionId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("finish calc delete bitmap. transaction id: {},
be: {}, report version: {}",
+ calcDeleteBitmapTask.getTransactionId(),
calcDeleteBitmapTask.getBackendId(),
+ request.getReportVersion());
+ }
+ }
+ } finally {
+ AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.CALCULATE_DELETE_BITMAP, task.getSignature());
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
index 3620f2e0413..400db4fc65f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
@@ -225,12 +225,6 @@ public class CreateTableInfo {
properties = Maps.newHashMap();
}
- if (Config.isCloudMode() && properties != null
- &&
properties.containsKey(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE)) {
- // FIXME: MOW is not supported in cloud mode yet.
- properties.put(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE,
"false");
- }
-
if (Strings.isNullOrEmpty(engineName) ||
engineName.equalsIgnoreCase("olap")) {
if (distribution == null) {
throw new AnalysisException("Create olap table should contain
distribution desc");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 1f2e662c757..1de23992fed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -26,6 +26,7 @@ import org.apache.doris.thrift.TAgentServiceVersion;
import org.apache.doris.thrift.TAgentTaskRequest;
import org.apache.doris.thrift.TAlterInvertedIndexReq;
import org.apache.doris.thrift.TAlterTabletReqV2;
+import org.apache.doris.thrift.TCalcDeleteBitmapRequest;
import org.apache.doris.thrift.TCheckConsistencyReq;
import org.apache.doris.thrift.TClearAlterTaskRequest;
import org.apache.doris.thrift.TClearTransactionTaskRequest;
@@ -392,6 +393,15 @@ public class AgentBatchTask implements Runnable {
tAgentTaskRequest.setGcBinlogReq(request);
return tAgentTaskRequest;
}
+ case CALCULATE_DELETE_BITMAP: {
+ CalcDeleteBitmapTask calcDeleteBitmapTask =
(CalcDeleteBitmapTask) task;
+ TCalcDeleteBitmapRequest request =
calcDeleteBitmapTask.toThrift();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(request.toString());
+ }
+ tAgentTaskRequest.setCalcDeleteBitmapReq(request);
+ return tAgentTaskRequest;
+ }
default:
if (LOG.isDebugEnabled()) {
LOG.debug("could not find task type for task [{}]", task);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java
new file mode 100644
index 00000000000..4188cf61849
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.common.MarkedCountDownLatch;
+import org.apache.doris.common.Status;
+import org.apache.doris.thrift.TCalcDeleteBitmapPartitionInfo;
+import org.apache.doris.thrift.TCalcDeleteBitmapRequest;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TTaskType;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CalcDeleteBitmapTask extends AgentTask {
+ private static final Logger LOG =
LogManager.getLogger(CreateReplicaTask.class);
+ // used for synchronous process
+ private MarkedCountDownLatch<Long, Long> latch;
+ private long transactionId;
+ private List<TCalcDeleteBitmapPartitionInfo> partitionInfos;
+ private List<Long> errorTablets;
+
+ public CalcDeleteBitmapTask(long backendId, long transactionId, long dbId,
+ List<TCalcDeleteBitmapPartitionInfo> partitionInfos,
+ MarkedCountDownLatch<Long, Long> latch) {
+ super(null, backendId, TTaskType.CALCULATE_DELETE_BITMAP, dbId, -1L,
-1L, -1L, -1L, transactionId);
+ this.transactionId = transactionId;
+ this.partitionInfos = partitionInfos;
+ this.errorTablets = new ArrayList<Long>();
+ this.isFinished = false;
+ this.latch = latch;
+ }
+
+ public void countDownLatch(long backendId, long transactionId) {
+ if (this.latch != null) {
+ if (latch.markedCountDown(backendId, transactionId)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CalcDeleteBitmapTask current latch count: {},
backend: {}, transactionId:{}",
+ latch.getCount(), backendId, transactionId);
+ }
+ }
+ }
+ }
+
+ // call this always means one of tasks is failed. count down to zero to
finish entire task
+ public void countDownToZero(String errMsg) {
+ if (this.latch != null) {
+ latch.countDownToZero(new Status(TStatusCode.CANCELLED, errMsg));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CalcDeleteBitmapTask download to zero. error msg:
{}", errMsg);
+ }
+ }
+ }
+
+ public void countDownToZero(TStatusCode code, String errMsg) {
+ if (this.latch != null) {
+ latch.countDownToZero(new Status(code, errMsg));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CalcDeleteBitmapTask download to zero. error msg:
{}", errMsg);
+ }
+ }
+ }
+
+ public void setLatch(MarkedCountDownLatch<Long, Long> latch) {
+ this.latch = latch;
+ }
+
+ public TCalcDeleteBitmapRequest toThrift() {
+ TCalcDeleteBitmapRequest calcDeleteBitmapRequest = new
TCalcDeleteBitmapRequest(transactionId,
+ partitionInfos);
+ return calcDeleteBitmapRequest;
+ }
+
+ public long getTransactionId() {
+ return transactionId;
+ }
+
+ public List<TCalcDeleteBitmapPartitionInfo>
getCalcDeleteBimapPartitionInfos() {
+ return partitionInfos;
+ }
+
+ public synchronized List<Long> getErrorTablets() {
+ return errorTablets;
+ }
+
+ public synchronized void addErrorTablets(List<Long> errorTablets) {
+ this.errorTablets.clear();
+ if (errorTablets == null) {
+ return;
+ }
+ this.errorTablets.addAll(errorTablets);
+ }
+
+ public void setIsFinished(boolean isFinished) {
+ this.isFinished = isFinished;
+ }
+
+ public boolean isFinished() {
+ return isFinished;
+ }
+}
diff --git
a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
index 1518def58c1..ba7214cc43c 100644
--- a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
+++ b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
@@ -16,7 +16,7 @@
// under the License.
testGroups = "p0"
-testDirectories =
"ddl_p0,database_p0,load,load_p0,query_p0,table_p0,account_p0,autobucket,bitmap_functions,bloom_filter_p0,cast_decimal_to_boolean,cast_double_to_decimal,compression_p0,connector_p0,correctness,correctness_p0,csv_header_p0,data_model_p0,database_p0,datatype_p0,delete_p0,demo_p0,empty_relation,export_p0,external_table_p0,fault_injection_p0,flink_connector_p0,insert_overwrite_p0,insert_p0,internal_schema_p0,javaudf_p0,job_p0,json_p0,jsonb_p0,meta_action_p0,metrics_p0,mtmv_
[...]
+testDirectories =
"ddl_p0,database_p0,load,load_p0,query_p0,table_p0,account_p0,autobucket,bitmap_functions,bloom_filter_p0,cast_decimal_to_boolean,cast_double_to_decimal,compression_p0,connector_p0,correctness,correctness_p0,csv_header_p0,data_model_p0,database_p0,datatype_p0,delete_p0,demo_p0,empty_relation,export_p0,external_table_p0,fault_injection_p0,flink_connector_p0,insert_overwrite_p0,insert_p0,internal_schema_p0,javaudf_p0,job_p0,json_p0,jsonb_p0,meta_action_p0,metrics_p0,mtmv_
[...]
//exclude groups and exclude suites is more prior than include groups and
include suites.
-excludeSuites =
"test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external"
-excludeDirectories = "workload_manager_p1,nereids_rules_p0/subquery"
+excludeSuites =
"test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external,test_primary_key_partial_update_parallel"
+excludeDirectories =
"workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]