This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 84f61c986bc branch-3.0-pick: [Fix](cloud) Fix dup key problem when
`enable_new_tablet_do_compaction=true` (#48399) (#49019)
84f61c986bc is described below
commit 84f61c986bc6596e5b83e24d9e59a6df214fb553
Author: bobhan1 <[email protected]>
AuthorDate: Fri Mar 14 09:59:59 2025 +0800
branch-3.0-pick: [Fix](cloud) Fix dup key problem when
`enable_new_tablet_do_compaction=true` (#48399) (#49019)
pick https://github.com/apache/doris/pull/48399
---
be/src/cloud/cloud_compaction_stop_token.cpp | 125 ++++++++++++++++++
be/src/cloud/cloud_compaction_stop_token.h | 45 +++++++
.../cloud/cloud_cumulative_compaction_policy.cpp | 7 +-
be/src/cloud/cloud_schema_change_job.cpp | 7 +
be/src/cloud/cloud_storage_engine.cpp | 65 ++++++++++
be/src/cloud/cloud_storage_engine.h | 8 ++
cloud/src/meta-service/meta_service_job.cpp | 79 +++++++++++-
gensrc/proto/cloud.proto | 1 +
.../cloud/test_cloud_mow_new_tablet_compaction.out | Bin 0 -> 206 bytes
.../test_cloud_mow_new_tablet_compaction.groovy | 143 +++++++++++++++++++++
10 files changed, 477 insertions(+), 3 deletions(-)
diff --git a/be/src/cloud/cloud_compaction_stop_token.cpp
b/be/src/cloud/cloud_compaction_stop_token.cpp
new file mode 100644
index 00000000000..9d6f1b614a6
--- /dev/null
+++ b/be/src/cloud/cloud_compaction_stop_token.cpp
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_compaction_stop_token.h"
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/config.h"
+#include "common/logging.h"
+#include "gen_cpp/cloud.pb.h"
+
+namespace doris {
+
+CloudCompactionStopToken::CloudCompactionStopToken(CloudStorageEngine& engine,
+ CloudTabletSPtr tablet,
int64_t initiator)
+ : _engine {engine}, _tablet {std::move(tablet)}, _initiator(initiator)
{
+ auto uuid = UUIDGenerator::instance()->next_uuid();
+ std::stringstream ss;
+ ss << uuid;
+ _uuid = ss.str();
+}
+
+void CloudCompactionStopToken::do_lease() {
+ cloud::TabletJobInfoPB job;
+ auto* idx = job.mutable_idx();
+ idx->set_tablet_id(_tablet->tablet_id());
+ idx->set_table_id(_tablet->table_id());
+ idx->set_index_id(_tablet->index_id());
+ idx->set_partition_id(_tablet->partition_id());
+ auto* compaction_job = job.add_compaction();
+ compaction_job->set_id(_uuid);
+ using namespace std::chrono;
+ int64_t lease_time =
duration_cast<seconds>(system_clock::now().time_since_epoch()).count() +
+ (config::lease_compaction_interval_seconds * 4);
+ compaction_job->set_lease(lease_time);
+ auto st = _engine.meta_mgr().lease_tablet_job(job);
+ if (!st.ok()) {
+ LOG_WARNING("failed to lease compaction stop token")
+ .tag("job_id", _uuid)
+ .tag("delete_bitmap_lock_initiator", _initiator)
+ .tag("tablet_id", _tablet->tablet_id())
+ .error(st);
+ }
+}
+
+Status CloudCompactionStopToken::do_register() {
+ int64_t base_compaction_cnt = 0;
+ int64_t cumulative_compaction_cnt = 0;
+ {
+ std::lock_guard lock {_tablet->get_header_lock()};
+ base_compaction_cnt = _tablet->base_compaction_cnt();
+ cumulative_compaction_cnt = _tablet->cumulative_compaction_cnt();
+ }
+ cloud::TabletJobInfoPB job;
+ auto* idx = job.mutable_idx();
+ idx->set_tablet_id(_tablet->tablet_id());
+ idx->set_table_id(_tablet->table_id());
+ idx->set_index_id(_tablet->index_id());
+ idx->set_partition_id(_tablet->partition_id());
+ auto* compaction_job = job.add_compaction();
+ compaction_job->set_id(_uuid);
+ compaction_job->set_delete_bitmap_lock_initiator(_initiator);
+ compaction_job->set_initiator(BackendOptions::get_localhost() + ':' +
+
std::to_string(config::heartbeat_service_port));
+ compaction_job->set_type(cloud::TabletCompactionJobPB::STOP_TOKEN);
+ // required by MS to check if it's a valid compaction job
+ compaction_job->set_base_compaction_cnt(base_compaction_cnt);
+ compaction_job->set_cumulative_compaction_cnt(cumulative_compaction_cnt);
+ using namespace std::chrono;
+ int64_t now =
duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
+ compaction_job->set_expiration(now + config::compaction_timeout_seconds);
+ compaction_job->set_lease(now + (config::lease_compaction_interval_seconds
* 4));
+ cloud::StartTabletJobResponse resp;
+ auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp);
+ if (!st.ok()) {
+ LOG_WARNING("failed to register compaction stop token")
+ .tag("job_id", _uuid)
+ .tag("delete_bitmap_lock_initiator", _initiator)
+ .tag("tablet_id", _tablet->tablet_id())
+ .error(st);
+ }
+ return st;
+}
+
+Status CloudCompactionStopToken::do_unregister() {
+ cloud::TabletJobInfoPB job;
+ auto* idx = job.mutable_idx();
+ idx->set_tablet_id(_tablet->tablet_id());
+ idx->set_table_id(_tablet->table_id());
+ idx->set_index_id(_tablet->index_id());
+ idx->set_partition_id(_tablet->partition_id());
+ auto* compaction_job = job.add_compaction();
+ compaction_job->set_id(_uuid);
+ compaction_job->set_delete_bitmap_lock_initiator(_initiator);
+ compaction_job->set_initiator(BackendOptions::get_localhost() + ':' +
+
std::to_string(config::heartbeat_service_port));
+ compaction_job->set_type(cloud::TabletCompactionJobPB::STOP_TOKEN);
+ auto st = _engine.meta_mgr().abort_tablet_job(job);
+ if (!st.ok()) {
+ LOG_WARNING("failed to unregister compaction stop token")
+ .tag("job_id", _uuid)
+ .tag("delete_bitmap_lock_initiator", _initiator)
+ .tag("tablet_id", _tablet->tablet_id())
+ .error(st);
+ }
+ return st;
+}
+
+int64_t CloudCompactionStopToken::initiator() const {
+ return _initiator;
+}
+} // namespace doris
diff --git a/be/src/cloud/cloud_compaction_stop_token.h
b/be/src/cloud/cloud_compaction_stop_token.h
new file mode 100644
index 00000000000..ce61ebc3747
--- /dev/null
+++ b/be/src/cloud/cloud_compaction_stop_token.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet.h"
+
+namespace doris {
+
+class CloudCompactionStopToken {
+public:
+ CloudCompactionStopToken(CloudStorageEngine& engine, CloudTabletSPtr
tablet, int64_t initiator);
+ ~CloudCompactionStopToken() = default;
+
+ void do_lease();
+ Status do_register();
+ Status do_unregister();
+
+ int64_t initiator() const;
+
+private:
+ CloudStorageEngine& _engine;
+ CloudTabletSPtr _tablet;
+ std::string _uuid;
+ int64_t _initiator;
+};
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
index 9e3ca3eb3db..c49448e1998 100644
--- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
@@ -66,8 +66,13 @@ int64_t
CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
input_rowsets->push_back(rowset);
}
}
+ LOG_INFO(
+
"[CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_"
+ "input_rowsets] tablet_id={}, start={}, end={}, "
+ "input_rowsets->size()={}",
+ target_tablet_id, start_version, end_version,
input_rowsets->size());
+ return input_rowsets->size();
}
- return input_rowsets->size();
})
size_t promotion_size = cloud_promotion_size(tablet);
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index a109640e467..79a502587f6 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -418,6 +418,11 @@ Status
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
.tag("out_rowset_size", _output_rowsets.size())
.tag("start_calc_delete_bitmap_version",
start_calc_delete_bitmap_version)
.tag("alter_version", alter_version);
+
RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet,
initiator));
+ Defer defer {[&]() {
+
static_cast<void>(_cloud_storage_engine.unregister_compaction_stop_token(_new_tablet));
+ }};
+
TabletMetaSharedPtr tmp_meta =
std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
tmp_meta->delete_bitmap().delete_bitmap.clear();
std::shared_ptr<CloudTablet> tmp_tablet =
@@ -437,6 +442,8 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t
alter_version,
if (max_version >= start_calc_delete_bitmap_version) {
RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
{start_calc_delete_bitmap_version, max_version},
&incremental_rowsets));
+
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock",
+ DBUG_BLOCK);
for (auto rowset : incremental_rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet,
rowset));
}
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index 9c403ac8e3b..52ab28b52c3 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -25,9 +25,11 @@
#include <rapidjson/stringbuffer.h>
#include <algorithm>
+#include <mutex>
#include <variant>
#include "cloud/cloud_base_compaction.h"
+#include "cloud/cloud_compaction_stop_token.h"
#include "cloud/cloud_cumulative_compaction.h"
#include "cloud/cloud_cumulative_compaction_policy.h"
#include "cloud/cloud_full_compaction.h"
@@ -37,6 +39,8 @@
#include "cloud/cloud_txn_delete_bitmap_cache.h"
#include "cloud/cloud_warm_up_manager.h"
#include "cloud/config.h"
+#include "common/config.h"
+#include "common/status.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/file_cache_common.h"
@@ -758,6 +762,7 @@ void
CloudStorageEngine::_lease_compaction_thread_callback() {
std::vector<std::shared_ptr<CloudFullCompaction>> full_compactions;
std::vector<std::shared_ptr<CloudBaseCompaction>> base_compactions;
std::vector<std::shared_ptr<CloudCumulativeCompaction>>
cumu_compactions;
+ std::vector<std::shared_ptr<CloudCompactionStopToken>>
compation_stop_tokens;
{
std::lock_guard lock(_compaction_mtx);
for (auto& [_, base] : _submitted_base_compactions) {
@@ -775,8 +780,16 @@ void
CloudStorageEngine::_lease_compaction_thread_callback() {
full_compactions.push_back(full);
}
}
+ for (auto& [_, stop_token] : _active_compaction_stop_tokens) {
+ if (stop_token) {
+ compation_stop_tokens.push_back(stop_token);
+ }
+ }
}
// TODO(plat1ko): Support batch lease rpc
+ for (auto& stop_token : compation_stop_tokens) {
+ stop_token->do_lease();
+ }
for (auto& comp : full_compactions) {
comp->do_lease();
}
@@ -854,5 +867,57 @@ std::shared_ptr<CloudCumulativeCompactionPolicy>
CloudStorageEngine::cumu_compac
return _cumulative_compaction_policies.at(compaction_policy);
}
+Status CloudStorageEngine::register_compaction_stop_token(CloudTabletSPtr
tablet,
+ int64_t initiator) {
+ {
+ std::lock_guard lock(_compaction_mtx);
+ auto [_, success] =
_active_compaction_stop_tokens.emplace(tablet->tablet_id(), nullptr);
+ if (!success) {
+ return Status::AlreadyExist("stop token already exists for
tablet_id={}",
+ tablet->tablet_id());
+ }
+ }
+
+ auto stop_token = std::make_shared<CloudCompactionStopToken>(*this,
tablet, initiator);
+ auto st = stop_token->do_register();
+
+ if (!st.ok()) {
+ std::lock_guard lock(_compaction_mtx);
+ _active_compaction_stop_tokens.erase(tablet->tablet_id());
+ return st;
+ }
+
+ {
+ std::lock_guard lock(_compaction_mtx);
+ _active_compaction_stop_tokens[tablet->tablet_id()] = stop_token;
+ }
+ LOG_INFO(
+ "successfully register compaction stop token for tablet_id={}, "
+ "delete_bitmap_lock_initiator={}",
+ tablet->tablet_id(), initiator);
+ return st;
+}
+
+Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr
tablet) {
+ std::shared_ptr<CloudCompactionStopToken> stop_token;
+ {
+ std::lock_guard lock(_compaction_mtx);
+ if (auto it = _active_compaction_stop_tokens.find(tablet->tablet_id());
+ it != _active_compaction_stop_tokens.end()) {
+ stop_token = it->second;
+ } else {
+ return Status::NotFound("stop token not found for tablet_id={}",
tablet->tablet_id());
+ }
+ _active_compaction_stop_tokens.erase(tablet->tablet_id());
+ }
+ // stop token will be removed when SC commit or abort
+ // RETURN_IF_ERROR(stop_token->do_unregister());
+ LOG_INFO(
+ "successfully unregister compaction stop token for tablet_id={}, "
+ "delete_bitmap_lock_initiator={}",
+ tablet->tablet_id(), stop_token->initiator());
+ return Status::OK();
+}
+
#include "common/compile_check_end.h"
} // namespace doris
diff --git a/be/src/cloud/cloud_storage_engine.h
b/be/src/cloud/cloud_storage_engine.h
index 5e51285d93a..6381fbe6001 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -44,6 +44,7 @@ class CloudBaseCompaction;
class CloudFullCompaction;
class TabletHotspot;
class CloudWarmUpManager;
+class CloudCompactionStopToken;
class CloudStorageEngine final : public BaseStorageEngine {
public:
@@ -143,6 +144,10 @@ public:
return *_sync_load_for_tablets_thread_pool;
}
+ Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t
initiator);
+
+ Status unregister_compaction_stop_token(CloudTabletSPtr tablet);
+
private:
void _refresh_storage_vault_info_thread_callback();
void _vacuum_stale_rowsets_thread_callback();
@@ -188,6 +193,9 @@ private:
// tablet_id -> submitted cumu compactions, guarded by `_compaction_mtx`
std::unordered_map<int64_t,
std::vector<std::shared_ptr<CloudCumulativeCompaction>>>
_submitted_cumu_compactions;
+ // tablet_id -> active compaction stop tokens
+ std::unordered_map<int64_t, std::shared_ptr<CloudCompactionStopToken>>
+ _active_compaction_stop_tokens;
std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
diff --git a/cloud/src/meta-service/meta_service_job.cpp
b/cloud/src/meta-service/meta_service_job.cpp
index 5299b85f41d..99b61e7ebf0 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -20,6 +20,7 @@
#include <gen_cpp/cloud.pb.h>
#include <glog/logging.h>
+#include <algorithm>
#include <chrono>
#include <cstddef>
#include <sstream>
@@ -61,7 +62,8 @@ bool check_compaction_input_verions(const
TabletCompactionJobPB& compaction,
if (!job_pb.has_schema_change() ||
!job_pb.schema_change().has_alter_version()) {
return true;
}
- if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE) {
+ if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE ||
+ compaction.type() == TabletCompactionJobPB::STOP_TOKEN) {
return true;
}
if (compaction.input_versions_size() != 2 ||
@@ -192,11 +194,27 @@ void start_compaction_job(MetaServiceCode& code,
std::string& msg, std::stringst
}), compactions.end());
// clang-format on
// Check conflict job
+ if (std::ranges::any_of(compactions, [](const auto& c) {
+ return c.type() == TabletCompactionJobPB::STOP_TOKEN;
+ })) {
+ auto it = std::ranges::find_if(compactions, [](const auto& c) {
+ return c.type() == TabletCompactionJobPB::STOP_TOKEN;
+ });
+ msg = fmt::format(
+ "compactions are not allowed on tablet_id={} currently,
blocked by schema "
+ "change job delete_bitmap_initiator={}",
+ tablet_id, it->delete_bitmap_lock_initiator());
+ code = MetaServiceCode::JOB_TABLET_BUSY;
+ return;
+ }
if (compaction.type() == TabletCompactionJobPB::FULL) {
// Full compaction is generally used for data correctness repair
// for MOW table, so priority should be given to performing full
// compaction operations and canceling other types of compaction.
compactions.Clear();
+ } else if (compaction.type() == TabletCompactionJobPB::STOP_TOKEN) {
+ // fail all existing compactions
+ compactions.Clear();
} else if ((!compaction.has_check_input_versions_range() &&
compaction.input_versions().empty()) ||
(compaction.has_check_input_versions_range() &&
@@ -1111,6 +1129,25 @@ void process_schema_change_job(MetaServiceCode& code,
std::string& msg, std::str
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
if (!new_tablet_job_val.empty()) {
+ auto& compactions = *new_recorded_job.mutable_compaction();
+ auto origin_size = compactions.size();
+ compactions.erase(
+ std::remove_if(
+ compactions.begin(), compactions.end(),
+ [&](auto& c) {
+ return
c.has_delete_bitmap_lock_initiator() &&
+ c.delete_bitmap_lock_initiator() ==
+
schema_change.delete_bitmap_lock_initiator();
+ }),
+ compactions.end());
+ if (compactions.size() < origin_size) {
+ INSTANCE_LOG(INFO)
+ << "remove " << (origin_size - compactions.size())
+ << " STOP_TOKEN for schema_change job tablet_id="
<< tablet_id
+ << " delete_bitmap_lock_initiator="
+ << schema_change.delete_bitmap_lock_initiator()
+ << " key=" << hex(job_key);
+ }
new_recorded_job.clear_schema_change();
new_tablet_job_val = new_recorded_job.SerializeAsString();
txn->put(new_tablet_job_key, new_tablet_job_val);
@@ -1150,7 +1187,28 @@ void process_schema_change_job(MetaServiceCode& code,
std::string& msg, std::str
return;
}
if (schema_change.alter_version() < 2) { // no need to update stats
- // TODO(cyx): clear schema_change job?
+ // TODO(cyx): clear schema_change
job?
+ if (!new_tablet_job_val.empty()) {
+ auto& compactions = *new_recorded_job.mutable_compaction();
+ auto origin_size = compactions.size();
+ compactions.erase(
+ std::remove_if(compactions.begin(), compactions.end(),
+ [&](auto& c) {
+ return
c.has_delete_bitmap_lock_initiator() &&
+ c.delete_bitmap_lock_initiator()
==
+
schema_change.delete_bitmap_lock_initiator();
+ }),
+ compactions.end());
+ if (compactions.size() < origin_size) {
+ INSTANCE_LOG(INFO)
+ << "remove " << (origin_size - compactions.size())
+ << " STOP_TOKEN for schema_change job tablet_id=" <<
tablet_id
+ << " delete_bitmap_lock_initiator="
+ << schema_change.delete_bitmap_lock_initiator() << "
key=" << hex(job_key);
+ }
+ new_tablet_job_val = new_recorded_job.SerializeAsString();
+ txn->put(new_tablet_job_key, new_tablet_job_val);
+ }
need_commit = true;
return;
}
@@ -1287,6 +1345,23 @@ void process_schema_change_job(MetaServiceCode& code,
std::string& msg, std::str
auto job_val = recorded_job.SerializeAsString();
txn->put(job_key, job_val);
if (!new_tablet_job_val.empty()) {
+ auto& compactions = *new_recorded_job.mutable_compaction();
+ auto origin_size = compactions.size();
+ compactions.erase(
+ std::remove_if(compactions.begin(), compactions.end(),
+ [&](auto& c) {
+ return c.has_delete_bitmap_lock_initiator()
&&
+ c.delete_bitmap_lock_initiator() ==
+
schema_change.delete_bitmap_lock_initiator();
+ }),
+ compactions.end());
+ if (compactions.size() < origin_size) {
+ INSTANCE_LOG(INFO) << "remove " << (origin_size -
compactions.size())
+ << " STOP_TOKEN for schema_change job
tablet_id=" << tablet_id
+ << " delete_bitmap_lock_initiator="
+ << schema_change.delete_bitmap_lock_initiator()
+ << " key=" << hex(job_key);
+ }
new_recorded_job.clear_schema_change();
new_tablet_job_val = new_recorded_job.SerializeAsString();
txn->put(new_tablet_job_key, new_tablet_job_val);
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index ff0279990ee..28ec3ba67d8 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -514,6 +514,7 @@ message TabletCompactionJobPB {
CUMULATIVE = 2;
EMPTY_CUMULATIVE = 3; // just update cumulative point
FULL = 4;
+ STOP_TOKEN = 5; // fail existing compactions and deny newly incomming
compactions
}
// IP and port of the node which initiates this job
optional string initiator = 1; // prepare
diff --git
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.out
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.out
new file mode 100644
index 00000000000..4f02f6683a2
Binary files /dev/null and
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.out
differ
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.groovy
new file mode 100644
index 00000000000..467e9fddb43
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.groovy
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_cloud_mow_new_tablet_compaction", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ def customBeConfig = [
+ enable_new_tablet_do_compaction : true
+ ]
+
+ setBeConfigTemporary(customBeConfig) {
+ def table1 = "test_cloud_mow_new_tablet_compaction"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int,
+ `c3` int
+ )UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+
+ sql "insert into ${table1} values(1,1,1,1);"
+ sql "insert into ${table1} values(2,2,2,2);"
+ sql "insert into ${table1} values(3,3,3,2);"
+ sql "sync;"
+ qt_sql "select * from ${table1} order by k1;"
+
+ def backends = sql_return_maparray('show backends')
+ def tabletStats = sql_return_maparray("show tablets from ${table1};")
+ assert tabletStats.size() == 1
+ def tabletId = tabletStats[0].TabletId
+ def tabletBackendId = tabletStats[0].BackendId
+ def tabletBackend
+ for (def be : backends) {
+ if (be.BackendId == tabletBackendId) {
+ tabletBackend = be
+ break;
+ }
+ }
+ logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with
backendId=${tabletBackend.BackendId}");
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block")
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock")
+ sql "alter table ${table1} modify column c1 varchar(100);"
+
+ Thread.sleep(3000)
+
+ tabletStats = sql_return_maparray("show tablets from ${table1};")
+ def newTabletId = "-1"
+ for (def stat : tabletStats) {
+ if (stat.TabletId != tabletId) {
+ newTabletId = stat.TabletId
+ break
+ }
+ }
+
+ logger.info("new_tablet_id=${newTabletId}")
+
+ int start_ver = 5
+ int end_ver = 4
+
+ // these load will skip to calculate bitmaps in publish phase on
new tablet because it's in NOT_READY state
+
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+
GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+ def threads = []
+ threads << Thread.start { sql "insert into ${table1}
values(1,99,99,99),(3,99,99,99);"}
+ ++end_ver
+ Thread.sleep(200)
+ threads << Thread.start { sql "insert into ${table1}
values(5,88,88,88),(1,88,88,88);" }
+ ++end_ver
+ Thread.sleep(200)
+ threads << Thread.start { sql "insert into ${table1}
values(3,77,77,77),(5,77,77,77);" }
+ ++end_ver
+ Thread.sleep(2000)
+
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")
+
GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")
+ threads.each { it.join() }
+
+
+ // let sc capture these rowsets when calculating increment rowsets
without lock
+
GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block")
+ Thread.sleep(1000)
+
+ // do cumu compaction on these rowsets on new tablet
+ // this can happen when enable_new_tablet_do_compaction=true
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+ [tablet_id:"${newTabletId}", start_version: start_ver,
end_version: end_ver]);
+ {
+ // trigger cumu compaction, should fail
+ logger.info("trigger cumu compaction on tablet=${newTabletId}
BE.Host=${tabletBackend.Host} with backendId=${tabletBackend.BackendId}")
+ def (code, out, err) =
be_run_cumulative_compaction(tabletBackend.Host, tabletBackend.HttpPort,
newTabletId)
+ logger.info("Run compaction: code=" + code + ", out=" + out +
", err=" + err)
+ assert code == 0
+ def compactJson = parseJson(out.trim())
+ assert "success" != compactJson.status.toLowerCase()
+ }
+
+
GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock")
+ // wait for sc to finish
+ waitForSchemaChangeDone {
+ sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}'
ORDER BY createtime DESC LIMIT 1 """
+ time 1000
+ }
+
+ qt_dup_key_count "select k1,count() as cnt from ${table1} group by
k1 having cnt>1;"
+ order_qt_sql "select * from ${table1};"
+
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]