This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a6537a90cd [Enhancement] Garbage collection of unused data on remote
storage backend (#10731)
a6537a90cd is described below
commit a6537a90cd4fda01de4a51b7abdc23a06cc8a901
Author: plat1ko <[email protected]>
AuthorDate: Fri Jul 29 14:38:39 2022 +0800
[Enhancement] Garbage collection of unused data on remote storage backend
(#10731)
* [Feature](cold_on_s3) support unused remote rowset gc
* return aborted when skip drop tablet
* perform unused remote rowset gc
---
be/src/agent/task_worker_pool.cpp | 11 +--
be/src/common/config.h | 11 +++
be/src/common/status.h | 5 +-
be/src/io/fs/local_file_reader.cpp | 11 +--
be/src/io/fs/local_file_system.cpp | 35 +++----
be/src/io/fs/local_file_writer.cpp | 21 ++--
be/src/io/fs/s3_file_reader.cpp | 13 ++-
be/src/io/fs/s3_file_system.cpp | 66 +++++++++++--
be/src/io/fs/s3_file_system.h | 1 +
be/src/olap/base_compaction.cpp | 20 ++++
be/src/olap/base_tablet.h | 7 +-
be/src/olap/cumulative_compaction_policy.cpp | 7 +-
be/src/olap/data_dir.cpp | 62 ++++++++++++
be/src/olap/data_dir.h | 4 +
be/src/olap/olap_define.h | 2 +
be/src/olap/rowset/beta_rowset.cpp | 6 ++
be/src/olap/rowset/beta_rowset.h | 4 +
be/src/olap/snapshot_manager.cpp | 12 +++
be/src/olap/storage_engine.cpp | 8 +-
be/src/olap/tablet.cpp | 110 ++++++++++++++-------
be/src/olap/tablet.h | 17 +++-
be/src/olap/tablet_manager.cpp | 18 +++-
be/src/olap/tablet_manager.h | 12 +--
be/src/olap/tablet_meta.cpp | 8 +-
be/src/olap/tablet_meta.h | 19 ++--
be/src/olap/task/engine_clone_task.cpp | 2 +-
be/test/CMakeLists.txt | 1 +
be/test/olap/delete_handler_test.cpp | 8 +-
be/test/olap/delta_writer_test.cpp | 10 +-
.../olap/engine_storage_migration_task_test.cpp | 2 +-
...et_clone_test.cpp => remote_rowset_gc_test.cpp} | 94 +++++++++---------
be/test/olap/tablet_cooldown_test.cpp | 25 ++---
be/test/olap/tablet_mgr_test.cpp | 8 +-
be/test/olap/tablet_test.cpp | 2 +-
.../apache/doris/clone/BackendLoadStatistic.java | 10 +-
.../main/java/org/apache/doris/common/Config.java | 2 +-
gensrc/proto/olap_file.proto | 7 ++
37 files changed, 446 insertions(+), 215 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 40681ce17d..deb39a3a51 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -432,7 +432,8 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback() {
drop_tablet_req.tablet_id, false, &err);
if (dropped_tablet != nullptr) {
Status drop_status =
StorageEngine::instance()->tablet_manager()->drop_tablet(
- drop_tablet_req.tablet_id, drop_tablet_req.replica_id);
+ drop_tablet_req.tablet_id, drop_tablet_req.replica_id,
+ drop_tablet_req.is_drop_table_or_partition);
if (!drop_status.ok()) {
LOG(WARNING) << "drop table failed! signature: " <<
agent_task_req.signature;
error_msgs.push_back("drop table failed!");
@@ -442,11 +443,6 @@ void TaskWorkerPool::_drop_tablet_worker_thread_callback()
{
StorageEngine::instance()->txn_manager()->force_rollback_tablet_related_txns(
dropped_tablet->data_dir()->get_meta(),
drop_tablet_req.tablet_id,
drop_tablet_req.schema_hash,
dropped_tablet->tablet_uid());
- // We remove remote rowset directly.
- // TODO(cyx): do remove in background
- if (drop_tablet_req.is_drop_table_or_partition) {
- dropped_tablet->remove_all_remote_rowsets();
- }
}
} else {
status_code = TStatusCode::NOT_FOUND;
@@ -881,8 +877,7 @@ void
TaskWorkerPool::_update_tablet_meta_worker_thread_callback() {
} else {
LOG(INFO) << "set tablet cooldown resource "
<< tablet_meta_info.storage_policy;
- tablet->tablet_meta()->set_cooldown_resource(
- tablet_meta_info.storage_policy);
+
tablet->tablet_meta()->set_storage_policy(tablet_meta_info.storage_policy);
}
break;
}
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d1f43eaeba..06c8bebf2c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -789,6 +789,17 @@ CONF_Int32(s3_transfer_executor_pool_size, "2");
CONF_Bool(enable_time_lut, "true");
+#ifdef BE_TEST
+// test s3
+CONF_String(test_s3_resource, "resource");
+CONF_String(test_s3_ak, "ak");
+CONF_String(test_s3_sk, "sk");
+CONF_String(test_s3_endpoint, "endpoint");
+CONF_String(test_s3_region, "region");
+CONF_String(test_s3_bucket, "bucket");
+CONF_String(test_s3_prefix, "prefix");
+#endif
+
} // namespace config
} // namespace doris
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 7604d0aa30..bf947cb812 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -283,10 +283,11 @@ public:
template <typename... Args>
static Status ErrorFmt(TStatusCode::type code, const std::string& fmt,
Args&&... args) {
// In some cases, fmt contains '{}' but there are no args.
- if (sizeof...(args) == 0) {
+ if constexpr (sizeof...(args) == 0) {
return Status(code, fmt);
+ } else {
+ return Status(code, fmt::format(fmt, std::forward<Args>(args)...));
}
- return Status(code, fmt::format(fmt, std::forward<Args>(args)...));
}
template <typename... Args>
diff --git a/be/src/io/fs/local_file_reader.cpp
b/be/src/io/fs/local_file_reader.cpp
index 984306bf17..ae5bd44ab9 100644
--- a/be/src/io/fs/local_file_reader.cpp
+++ b/be/src/io/fs/local_file_reader.cpp
@@ -50,9 +50,8 @@ Status LocalFileReader::close() {
Status LocalFileReader::read_at(size_t offset, Slice result, size_t*
bytes_read) {
DCHECK(!closed());
if (offset > _file_size) {
- return Status::IOError(
- fmt::format("offset exceeds file size(offset: {), file size:
{}, path: {})", offset,
- _file_size, _path.native()));
+ return Status::IOError("offset exceeds file size(offset: {), file
size: {}, path: {})",
+ offset, _file_size, _path.native());
}
size_t bytes_req = result.size;
char* to = result.data;
@@ -62,12 +61,10 @@ Status LocalFileReader::read_at(size_t offset, Slice
result, size_t* bytes_read)
while (bytes_req != 0) {
auto res = ::pread(_fd, to, bytes_req, offset);
if (UNLIKELY(-1 == res && errno != EINTR)) {
- return Status::IOError(
- fmt::format("cannot read from {}: {}", _path.native(),
std::strerror(errno)));
+ return Status::IOError("cannot read from {}: {}", _path.native(),
std::strerror(errno));
}
if (UNLIKELY(res == 0)) {
- return Status::IOError(
- fmt::format("cannot read from {}: unexpected EOF",
_path.native()));
+ return Status::IOError("cannot read from {}: unexpected EOF",
_path.native());
}
if (res > 0) {
to += res;
diff --git a/be/src/io/fs/local_file_system.cpp
b/be/src/io/fs/local_file_system.cpp
index 3ba889c94d..46098e060f 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -40,8 +40,7 @@ Status LocalFileSystem::create_file(const Path& path,
FileWriterPtr* writer) {
auto fs_path = absolute_path(path);
int fd = ::open(fs_path.c_str(), O_TRUNC | O_WRONLY | O_CREAT | O_CLOEXEC,
0666);
if (-1 == fd) {
- return Status::IOError(
- fmt::format("cannot open {}: {}", fs_path.native(),
std::strerror(errno)));
+ return Status::IOError("cannot open {}: {}", fs_path.native(),
std::strerror(errno));
}
*writer = std::make_unique<LocalFileWriter>(std::move(fs_path), fd);
return Status::OK();
@@ -62,14 +61,16 @@ Status LocalFileSystem::open_file(const Path& path,
FileReaderSPtr* reader) {
Status LocalFileSystem::delete_file(const Path& path) {
auto fs_path = absolute_path(path);
+ if (!std::filesystem::exists(fs_path)) {
+ return Status::OK();
+ }
if (!std::filesystem::is_regular_file(fs_path)) {
- return Status::IOError(fmt::format("{} is not a file",
fs_path.native()));
+ return Status::IOError("{} is not a file", fs_path.native());
}
std::error_code ec;
std::filesystem::remove(fs_path, ec);
if (ec) {
- return Status::IOError(
- fmt::format("cannot delete {}: {}", fs_path.native(),
std::strerror(ec.value())));
+ return Status::IOError("cannot delete {}: {}", fs_path.native(),
std::strerror(ec.value()));
}
return Status::OK();
}
@@ -77,35 +78,36 @@ Status LocalFileSystem::delete_file(const Path& path) {
Status LocalFileSystem::create_directory(const Path& path) {
auto fs_path = absolute_path(path);
if (std::filesystem::exists(fs_path)) {
- return Status::IOError(fmt::format("{} exists", fs_path.native()));
+ return Status::IOError("{} exists", fs_path.native());
}
std::error_code ec;
std::filesystem::create_directories(fs_path, ec);
if (ec) {
- return Status::IOError(
- fmt::format("cannot create {}: {}", fs_path.native(),
std::strerror(ec.value())));
+ return Status::IOError("cannot create {}: {}", fs_path.native(),
std::strerror(ec.value()));
}
return Status::OK();
}
Status LocalFileSystem::delete_directory(const Path& path) {
auto fs_path = absolute_path(path);
+ if (!std::filesystem::exists(fs_path)) {
+ return Status::OK();
+ }
if (!std::filesystem::is_directory(fs_path)) {
- return Status::IOError(fmt::format("{} is not a directory",
fs_path.native()));
+ return Status::IOError("{} is not a directory", fs_path.native());
}
std::error_code ec;
std::filesystem::remove_all(fs_path, ec);
if (ec) {
- return Status::IOError(
- fmt::format("cannot delete {}: {}", fs_path.native(),
std::strerror(ec.value())));
+ return Status::IOError("cannot delete {}: {}", fs_path.native(),
std::strerror(ec.value()));
}
return Status::OK();
}
Status LocalFileSystem::link_file(const Path& src, const Path& dest) {
if (::link(src.c_str(), dest.c_str()) != 0) {
- return Status::IOError(fmt::format("fail to create hard link: {}. from
{} to {}",
- std::strerror(errno), src.native(),
dest.native()));
+ return Status::IOError("fail to create hard link: {}. from {} to {}",
std::strerror(errno),
+ src.native(), dest.native());
}
return Status::OK();
}
@@ -121,8 +123,8 @@ Status LocalFileSystem::file_size(const Path& path, size_t*
file_size) const {
std::error_code ec;
*file_size = std::filesystem::file_size(fs_path, ec);
if (ec) {
- return Status::IOError(fmt::format("cannot get file size {}: {}",
fs_path.native(),
- std::strerror(ec.value())));
+ return Status::IOError("cannot get file size {}: {}", fs_path.native(),
+ std::strerror(ec.value()));
}
return Status::OK();
}
@@ -135,8 +137,7 @@ Status LocalFileSystem::list(const Path& path,
std::vector<Path>* files) {
files->push_back(entry.path().filename());
}
if (ec) {
- return Status::IOError(
- fmt::format("cannot list {}: {}", fs_path.native(),
std::strerror(ec.value())));
+ return Status::IOError("cannot list {}: {}", fs_path.native(),
std::strerror(ec.value()));
}
return Status::OK();
}
diff --git a/be/src/io/fs/local_file_writer.cpp
b/be/src/io/fs/local_file_writer.cpp
index 1965ed6be0..95294dbb88 100644
--- a/be/src/io/fs/local_file_writer.cpp
+++ b/be/src/io/fs/local_file_writer.cpp
@@ -37,12 +37,10 @@ Status sync_dir(const io::Path& dirname) {
int fd;
RETRY_ON_EINTR(fd, ::open(dirname.c_str(), O_DIRECTORY | O_RDONLY));
if (-1 == fd) {
- return Status::IOError(
- fmt::format("cannot open {}: {}", dirname.native(),
std::strerror(errno)));
+ return Status::IOError("cannot open {}: {}", dirname.native(),
std::strerror(errno));
}
if (0 != ::fdatasync(fd)) {
- return Status::IOError(
- fmt::format("cannot fdatasync {}: {}", dirname.native(),
std::strerror(errno)));
+ return Status::IOError("cannot fdatasync {}: {}", dirname.native(),
std::strerror(errno));
}
::close(fd);
return Status::OK();
@@ -102,8 +100,7 @@ Status LocalFileWriter::appendv(const Slice* data, size_t
data_cnt) {
ssize_t res;
RETRY_ON_EINTR(res, ::writev(_fd, iov + completed_iov, iov_count));
if (UNLIKELY(res < 0)) {
- return Status::IOError(
- fmt::format("cannot write to {}: {}", _path.native(),
std::strerror(errno)));
+ return Status::IOError("cannot write to {}: {}", _path.native(),
std::strerror(errno));
}
if (LIKELY(res == n_left)) {
@@ -139,8 +136,7 @@ Status LocalFileWriter::finalize() {
#if defined(__linux__)
int flags = SYNC_FILE_RANGE_WRITE;
if (sync_file_range(_fd, 0, 0, flags) < 0) {
- return Status::IOError(
- fmt::format("cannot sync {}: {}", _path.native(),
std::strerror(errno)));
+ return Status::IOError("cannot sync {}: {}", _path.native(),
std::strerror(errno));
}
#endif
}
@@ -153,8 +149,7 @@ Status LocalFileWriter::_close(bool sync) {
}
if (sync && _dirty) {
if (0 != ::fdatasync(_fd)) {
- return Status::IOError(
- fmt::format("cannot fdatasync {}: {}", _path.native(),
std::strerror(errno)));
+ return Status::IOError("cannot fdatasync {}: {}", _path.native(),
std::strerror(errno));
}
RETURN_IF_ERROR(detail::sync_dir(_path.parent_path()));
_dirty = false;
@@ -166,8 +161,7 @@ Status LocalFileWriter::_close(bool sync) {
DorisMetrics::instance()->local_bytes_written_total->increment(_bytes_appended);
if (0 != ::close(_fd)) {
- return Status::IOError(
- fmt::format("cannot close {}: {}", _path.native(),
std::strerror(errno)));
+ return Status::IOError("cannot close {}: {}", _path.native(),
std::strerror(errno));
}
return Status::OK();
}
@@ -182,8 +176,7 @@ Status LocalFileWriter::write_at(size_t offset, const
Slice& data) {
while (bytes_req != 0) {
auto res = ::pwrite(_fd, from, bytes_req, offset);
if (-1 == res && errno != EINTR) {
- return Status::IOError(
- fmt::format("cannot write to {}: {}", _path.native(),
std::strerror(errno)));
+ return Status::IOError("cannot write to {}: {}", _path.native(),
std::strerror(errno));
}
if (res > 0) {
from += res;
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index a75da54bb0..2f16b03db4 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -52,9 +52,8 @@ Status S3FileReader::close() {
Status S3FileReader::read_at(size_t offset, Slice result, size_t* bytes_read) {
DCHECK(!closed());
if (offset > _file_size) {
- return Status::IOError(
- fmt::format("offset exceeds file size(offset: {), file size:
{}, path: {})", offset,
- _file_size, _path.native()));
+ return Status::IOError("offset exceeds file size(offset: {), file
size: {}, path: {})",
+ offset, _file_size, _path.native());
}
size_t bytes_req = result.size;
char* to = result.data;
@@ -75,13 +74,13 @@ Status S3FileReader::read_at(size_t offset, Slice result,
size_t* bytes_read) {
}
auto outcome = client->GetObject(request);
if (!outcome.IsSuccess()) {
- return Status::IOError(fmt::format("failed to read from {}: {}",
_path.native(),
- outcome.GetError().GetMessage()));
+ return Status::IOError("failed to read from {}: {}", _path.native(),
+ outcome.GetError().GetMessage());
}
*bytes_read = outcome.GetResult().GetContentLength();
if (*bytes_read != bytes_req) {
- return Status::IOError(fmt::format("failed to read from {}(bytes read:
{}, bytes req: {})",
- _path.native(), *bytes_read,
bytes_req));
+ return Status::IOError("failed to read from {}(bytes read: {}, bytes
req: {})",
+ _path.native(), *bytes_read, bytes_req);
}
DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read);
return Status::OK();
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index e421863774..65d6a27de7 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -20,7 +20,9 @@
#include <aws/core/utils/threading/Executor.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/DeleteObjectRequest.h>
+#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/HeadObjectRequest.h>
+#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/transfer/TransferManager.h>
@@ -153,12 +155,13 @@ Status S3FileSystem::delete_file(const Path& path) {
request.WithBucket(_s3_conf.bucket).WithKey(key);
auto outcome = client->DeleteObject(request);
- if (!outcome.IsSuccess()) {
- return Status::IOError("failed to delete object(endpoint={},
bucket={}, key={}): {}",
- _s3_conf.endpoint, _s3_conf.bucket, key,
- outcome.GetError().GetMessage());
+ if (outcome.IsSuccess() ||
+ outcome.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::NOT_FOUND) {
+ return Status::OK();
}
- return Status::OK();
+ return Status::IOError("failed to delete object(endpoint={}, bucket={},
key={}): {}",
+ _s3_conf.endpoint, _s3_conf.bucket, key,
+ outcome.GetError().GetMessage());
}
Status S3FileSystem::create_directory(const Path& path) {
@@ -166,7 +169,54 @@ Status S3FileSystem::create_directory(const Path& path) {
}
Status S3FileSystem::delete_directory(const Path& path) {
- return Status::NotSupported("not support");
+ auto client = get_client();
+ CHECK_S3_CLIENT(client);
+
+ Aws::S3::Model::ListObjectsV2Request request;
+ auto prefix = get_key(path);
+ request.WithBucket(_s3_conf.bucket).WithPrefix(prefix);
+
+ Aws::S3::Model::DeleteObjectsRequest delete_request;
+ delete_request.SetBucket(_s3_conf.bucket);
+ bool is_trucated = false;
+ do {
+ auto outcome = client->ListObjectsV2(request);
+ if (!outcome.IsSuccess()) {
+ return Status::IOError("failed to list objects(endpoint={},
bucket={}, prefix={}): {}",
+ _s3_conf.endpoint, _s3_conf.bucket, prefix,
+ outcome.GetError().GetMessage());
+ }
+ const auto& result = outcome.GetResult();
+ Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
+ objects.reserve(result.GetContents().size());
+ for (const auto& obj : result.GetContents()) {
+ objects.emplace_back().SetKey(obj.GetKey());
+ }
+ if (!objects.empty()) {
+ Aws::S3::Model::Delete del;
+ del.WithObjects(std::move(objects)).SetQuiet(true);
+ delete_request.SetDelete(std::move(del));
+ auto delete_outcome = client->DeleteObjects(delete_request);
+ if (!delete_outcome.IsSuccess()) {
+ return Status::IOError(
+ "failed to delete objects(endpoint={}, bucket={},
prefix={}): {}",
+ _s3_conf.endpoint, _s3_conf.bucket, prefix,
+ delete_outcome.GetError().GetMessage());
+ }
+ if (!delete_outcome.GetResult().GetErrors().empty()) {
+ const auto& e = delete_outcome.GetResult().GetErrors().front();
+ return Status::IOError("fail to delete object(endpoint={},
bucket={}, key={}): {}",
+ _s3_conf.endpoint, _s3_conf.bucket,
e.GetKey(),
+ e.GetMessage());
+ }
+ VLOG_TRACE << "delete " << objects.size()
+ << " s3 objects, endpoint: " << _s3_conf.endpoint
+ << ", bucket: " << _s3_conf.bucket << ", prefix: " <<
_s3_conf.prefix;
+ }
+ is_trucated = result.GetIsTruncated();
+ request.SetContinuationToken(result.GetNextContinuationToken());
+ } while (is_trucated);
+ return Status::OK();
}
Status S3FileSystem::link_file(const Path& src, const Path& dest) {
@@ -181,7 +231,7 @@ Status S3FileSystem::exists(const Path& path, bool* res)
const {
auto key = get_key(path);
request.WithBucket(_s3_conf.bucket).WithKey(key);
- auto outcome = _client->HeadObject(request);
+ auto outcome = client->HeadObject(request);
if (outcome.IsSuccess()) {
*res = true;
} else if (outcome.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::NOT_FOUND) {
@@ -202,7 +252,7 @@ Status S3FileSystem::file_size(const Path& path, size_t*
file_size) const {
auto key = get_key(path);
request.WithBucket(_s3_conf.bucket).WithKey(key);
- auto outcome = _client->HeadObject(request);
+ auto outcome = client->HeadObject(request);
if (outcome.IsSuccess()) {
*file_size = outcome.GetResult().GetContentLength();
} else {
diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h
index 219ecb3a0d..9eb393996f 100644
--- a/be/src/io/fs/s3_file_system.h
+++ b/be/src/io/fs/s3_file_system.h
@@ -46,6 +46,7 @@ public:
Status create_directory(const Path& path) override;
+ // Delete all objects start with path.
Status delete_directory(const Path& path) override;
Status link_file(const Path& src, const Path& dest) override;
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 1284af6084..307fa2fee0 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -92,6 +92,26 @@ Status BaseCompaction::pick_rowsets_to_compact() {
RETURN_NOT_OK(check_version_continuity(_input_rowsets));
RETURN_NOT_OK(_check_rowset_overlapping(_input_rowsets));
+ // If there are delete predicate rowsets in tablet, start_version > 0
implies some rowsets before
+ // delete version cannot apply these delete predicates, which can cause
incorrect query result.
+ // So we must abort this base compaction.
+ // A typical scenario is that some rowsets before cumulative point are on
remote storage.
+ if (_input_rowsets.front()->start_version() > 0) {
+ bool has_delete_predicate = false;
+ for (const auto& rs : _input_rowsets) {
+ if (rs->rowset_meta()->has_delete_predicate()) {
+ has_delete_predicate = true;
+ break;
+ }
+ }
+ if (has_delete_predicate) {
+ LOG(WARNING)
+ << "Some rowsets cannot apply delete predicates in base
compaction. tablet_id="
+ << _tablet->tablet_id();
+ return Status::OLAPInternalError(OLAP_ERR_BE_NO_SUITABLE_VERSION);
+ }
+ }
+
if (_input_rowsets.size() == 2 && _input_rowsets[0]->end_version() == 1) {
// the tablet is with rowset: [0-1], [2-y]
// and [0-1] has no data. in this situation, no need to do base
compaction.
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 84e336fc2e..26b44f9665 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -18,6 +18,7 @@
#pragma once
#include <memory>
+#include <string>
#include "olap/olap_define.h"
#include "olap/tablet_meta.h"
@@ -58,11 +59,9 @@ public:
int16_t shard_id() const;
bool equal(int64_t tablet_id, int32_t schema_hash);
- const io::ResourceId& cooldown_resource() const { return
_tablet_meta->cooldown_resource(); }
+ const std::string& storage_policy() const { return
_tablet_meta->storage_policy(); }
- void set_cooldown_resource(io::ResourceId resource) {
- _tablet_meta->set_cooldown_resource(std::move(resource));
- }
+ void set_storage_policy(const std::string& policy) {
_tablet_meta->set_storage_policy(policy); }
// properties encapsulated in TabletSchema
virtual const TabletSchema& tablet_schema() const;
diff --git a/be/src/olap/cumulative_compaction_policy.cpp
b/be/src/olap/cumulative_compaction_policy.cpp
index 05da215028..874ce60ddf 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -463,10 +463,9 @@ void
NumBasedCumulativeCompactionPolicy::calculate_cumulative_point(
void CumulativeCompactionPolicy::pick_candidate_rowsets(
const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>&
rs_version_map,
int64_t cumulative_point, std::vector<RowsetSharedPtr>*
candidate_rowsets) {
- for (auto& it : rs_version_map) {
- // find all rowset version greater than cumulative_point and skip the
create time in skip_window_sec
- if (it.first.first >= cumulative_point && it.second->is_local()) {
- candidate_rowsets->push_back(it.second);
+ for (const auto& [version, rs] : rs_version_map) {
+ if (version.first >= cumulative_point && rs->is_local()) {
+ candidate_rowsets->push_back(rs);
}
}
std::sort(candidate_rowsets->begin(), candidate_rowsets->end(),
Rowset::comparator);
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index 0f7acc749f..baf51034e6 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -40,6 +40,7 @@
#include "io/fs/path.h"
#include "olap/file_helper.h"
#include "olap/olap_define.h"
+#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/storage_engine.h"
#include "olap/tablet_meta_manager.h"
@@ -820,4 +821,65 @@ Status DataDir::move_to_trash(const std::string&
tablet_path) {
return Status::OK();
}
+void DataDir::perform_remote_rowset_gc() {
+ std::vector<std::pair<std::string, std::string>> gc_kvs;
+ auto traverse_remote_rowset_func = [&gc_kvs](const std::string& key,
+ const std::string& value) ->
bool {
+ gc_kvs.emplace_back(key, value);
+ return true;
+ };
+ _meta->iterate(META_COLUMN_FAMILY_INDEX, REMOTE_ROWSET_GC_PREFIX,
traverse_remote_rowset_func);
+ std::vector<std::string> deleted_keys;
+ for (auto& [key, val] : gc_kvs) {
+ auto rowset_id = key.substr(REMOTE_ROWSET_GC_PREFIX.size());
+ RemoteRowsetGcPB gc_pb;
+ gc_pb.ParseFromString(val);
+ auto fs = io::FileSystemMap::instance()->get(gc_pb.resource_id());
+ if (!fs) {
+ LOG(WARNING) << "Cannot get file system: " << gc_pb.resource_id();
+ continue;
+ }
+ DCHECK(fs->type() != io::FileSystemType::LOCAL);
+ Status st;
+ for (int i = 0; i < gc_pb.num_segments(); ++i) {
+ auto seg_path = BetaRowset::remote_segment_path(gc_pb.tablet_id(),
rowset_id, i);
+ st = fs->delete_file(seg_path);
+ if (!st.ok()) {
+ LOG(WARNING) << st.to_string();
+ break;
+ }
+ }
+ if (st.ok()) {
+ deleted_keys.push_back(std::move(key));
+ }
+ }
+ for (const auto& key : deleted_keys) {
+ _meta->remove(META_COLUMN_FAMILY_INDEX, key);
+ }
+}
+
+void DataDir::perform_remote_tablet_gc() {
+ std::vector<std::pair<std::string, std::string>> tablet_gc_kvs;
+ auto traverse_remote_tablet_func = [&tablet_gc_kvs](const std::string& key,
+ const std::string&
value) -> bool {
+ tablet_gc_kvs.emplace_back(key, value);
+ return true;
+ };
+ _meta->iterate(META_COLUMN_FAMILY_INDEX, REMOTE_TABLET_GC_PREFIX,
traverse_remote_tablet_func);
+ std::vector<std::string> deleted_keys;
+ for (auto& [key, resource] : tablet_gc_kvs) {
+ auto tablet_id = key.substr(REMOTE_TABLET_GC_PREFIX.size());
+ auto fs = io::FileSystemMap::instance()->get(resource);
+ auto st = fs->delete_directory(DATA_PREFIX + "/" + tablet_id);
+ if (st.ok()) {
+ deleted_keys.push_back(std::move(key));
+ } else {
+ LOG(WARNING) << st;
+ }
+ }
+ for (const auto& key : deleted_keys) {
+ _meta->remove(META_COLUMN_FAMILY_INDEX, key);
+ }
+}
+
} // namespace doris
diff --git a/be/src/olap/data_dir.h b/be/src/olap/data_dir.h
index 642058c46f..4cf40fd1bd 100644
--- a/be/src/olap/data_dir.h
+++ b/be/src/olap/data_dir.h
@@ -116,6 +116,10 @@ public:
void perform_path_gc_by_tablet();
+ void perform_remote_rowset_gc();
+
+ void perform_remote_tablet_gc();
+
// check if the capacity reach the limit after adding the incoming data
// return true if limit reached, otherwise, return false.
// TODO(cmy): for now we can not precisely calculate the capacity Doris
used,
diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index 077ed670f9..65f2de8c8e 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -127,6 +127,8 @@ const std::string TABLET_ID_KEY = "tablet_id";
const std::string ENABLE_BYTE_TO_BASE64 = "byte_to_base64";
const std::string TABLET_ID_PREFIX = "t_";
const std::string ROWSET_ID_PREFIX = "s_";
+const std::string REMOTE_ROWSET_GC_PREFIX = "gc_";
+const std::string REMOTE_TABLET_GC_PREFIX = "tgc_";
#if defined(__GNUC__)
#define OLAP_LIKELY(x) __builtin_expect((x), 1)
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index 1329917a82..1e7cbde9f4 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -46,6 +46,12 @@ std::string BetaRowset::local_segment_path(const
std::string& tablet_path,
return fmt::format("{}/{}_{}.dat", tablet_path, rowset_id.to_string(),
segment_id);
}
+std::string BetaRowset::remote_segment_path(int64_t tablet_id, const
std::string& rowset_id,
+ int segment_id) {
+ // data/{tablet_id}/{rowset_id}_{seg_num}.dat
+ return fmt::format("{}/{}/{}_{}.dat", DATA_PREFIX, tablet_id, rowset_id,
segment_id);
+}
+
std::string BetaRowset::remote_segment_path(int64_t tablet_id, const RowsetId&
rowset_id,
int segment_id) {
// data/{tablet_id}/{rowset_id}_{seg_num}.dat
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index 9ccaa7b2b9..8d96bfe090 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -19,6 +19,7 @@
#define DORIS_SRC_OLAP_ROWSET_BETA_ROWSET_H_
#include <cstdint>
+#include <string>
#include "olap/olap_common.h"
#include "olap/olap_define.h"
@@ -49,6 +50,9 @@ public:
static std::string remote_segment_path(int64_t tablet_id, const RowsetId&
rowset_id,
int segment_id);
+ static std::string remote_segment_path(int64_t tablet_id, const
std::string& rowset_id,
+ int segment_id);
+
Status split_range(const RowCursor& start_key, const RowCursor& end_key,
uint64_t request_block_row_count, size_t key_num,
std::vector<OlapTuple>* ranges) override;
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 74cbb1aa68..390bd66206 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -28,12 +28,14 @@
#include <map>
#include <set>
+#include "common/status.h"
#include "env/env.h"
#include "gen_cpp/Types_constants.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/storage_engine.h"
+#include "olap/tablet_meta.h"
#include "runtime/thread_context.h"
using std::filesystem::path;
@@ -360,6 +362,9 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
/// make the full snapshot of the tablet.
{
std::shared_lock rdlock(ref_tablet->get_header_lock());
+ if (ref_tablet->tablet_state() == TABLET_SHUTDOWN) {
+ return Status::Aborted("tablet has shutdown");
+ }
if (request.__isset.missing_version) {
for (int64_t missed_version : request.missing_version) {
Version version = {missed_version, missed_version};
@@ -422,6 +427,13 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
CHECK(res.ok()) << res;
ref_tablet->generate_tablet_meta_copy_unlocked(new_tablet_meta);
}
+ {
+ std::unique_lock wlock(ref_tablet->get_header_lock());
+ if (ref_tablet->tablet_state() == TABLET_SHUTDOWN) {
+ return Status::Aborted("tablet has shutdown");
+ }
+ ref_tablet->update_self_owned_remote_rowsets(consistent_rowsets);
+ }
std::vector<RowsetMetaSharedPtr> rs_metas;
for (auto& rs : consistent_rowsets) {
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 9a7c414971..4e28ee7d8b 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -329,7 +329,7 @@ std::vector<DataDir*> StorageEngine::get_stores() {
stores.reserve(_store_map.size());
std::lock_guard<std::mutex> l(_store_lock);
- if (include_unused) {
+ if constexpr (include_unused) {
for (auto& it : _store_map) {
stores.push_back(it.second);
}
@@ -720,6 +720,12 @@ Status StorageEngine::start_trash_sweep(double* usage,
bool ignore_guard) {
// clean unused rowset metas in OlapMeta
_clean_unused_rowset_metas();
+ // clean unused rowsets in remote storage backends
+ for (auto data_dir : get_stores()) {
+ data_dir->perform_remote_rowset_gc();
+ data_dir->perform_remote_tablet_gc();
+ }
+
return res;
}
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index c60f3c4d07..290753de9e 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -18,6 +18,7 @@
#include "olap/tablet.h"
#include <ctype.h>
+#include <fmt/core.h>
#include <glog/logging.h>
#include <pthread.h>
#include <rapidjson/prettywriter.h>
@@ -1145,10 +1146,8 @@ void
Tablet::pick_candidate_rowsets_to_cumulative_compaction(
void
Tablet::pick_candidate_rowsets_to_base_compaction(vector<RowsetSharedPtr>*
candidate_rowsets) {
std::shared_lock rdlock(_meta_lock);
- // FIXME(cyx): If there are delete predicate rowsets in tablet,
- // remote rowsets cannot apply these delete predicate, which can cause
- // incorrect query result.
for (auto& it : _rs_version_map) {
+ // Do compaction on local rowsets only.
if (it.first.first < _cumulative_point && it.second->is_local()) {
candidate_rowsets->push_back(it.second);
}
@@ -1702,7 +1701,7 @@ Status Tablet::cooldown() {
LOG(WARNING) << "Failed to own cumu_compaction_lock. tablet=" <<
tablet_id();
return Status::OLAPInternalError(OLAP_ERR_BE_TRY_BE_LOCK_ERROR);
}
- auto dest_fs = io::FileSystemMap::instance()->get(cooldown_resource());
+ auto dest_fs = io::FileSystemMap::instance()->get(storage_policy());
if (!dest_fs) {
return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
}
@@ -1716,8 +1715,13 @@ Status Tablet::cooldown() {
auto start = std::chrono::steady_clock::now();
-
RETURN_IF_ERROR(old_rowset->upload_to(reinterpret_cast<io::RemoteFileSystem*>(dest_fs.get()),
- new_rowset_id));
+ auto st =
old_rowset->upload_to(reinterpret_cast<io::RemoteFileSystem*>(dest_fs.get()),
+ new_rowset_id);
+ if (!st.ok()) {
+ record_unused_remote_rowset(new_rowset_id, dest_fs->resource_id(),
+ old_rowset->num_segments());
+ return st;
+ }
auto duration =
std::chrono::duration<float>(std::chrono::steady_clock::now() - start);
LOG(INFO) << "Upload rowset " << old_rowset->version() << " " <<
new_rowset_id.to_string()
@@ -1732,14 +1736,30 @@ Status Tablet::cooldown() {
new_rowset_meta->set_fs(dest_fs);
new_rowset_meta->set_creation_time(time(nullptr));
RowsetSharedPtr new_rowset;
- RowsetFactory::create_rowset(&_schema, _tablet_path,
std::move(new_rowset_meta), &new_rowset);
+ RowsetFactory::create_rowset(&_schema, _tablet_path, new_rowset_meta,
&new_rowset);
std::vector to_add {std::move(new_rowset)};
std::vector to_delete {std::move(old_rowset)};
- std::unique_lock meta_wlock(_meta_lock);
- modify_rowsets(to_add, to_delete);
- save_meta();
+ bool has_shutdown = false;
+ {
+ std::unique_lock meta_wlock(_meta_lock);
+ has_shutdown = tablet_state() == TABLET_SHUTDOWN;
+ if (!has_shutdown) {
+ modify_rowsets(to_add, to_delete);
+ if (new_rowset_meta->has_delete_predicate()) {
+ add_delete_predicate(new_rowset_meta->delete_predicate(),
+ new_rowset_meta->start_version());
+ }
+ _self_owned_remote_rowsets.insert(to_add.front());
+ save_meta();
+ }
+ }
+ if (has_shutdown) {
+ record_unused_remote_rowset(new_rowset_id, dest_fs->resource_id(),
+ to_add.front()->num_segments());
+ return Status::Aborted("tablet {} has shutdown", tablet_id());
+ }
return Status::OK();
}
@@ -1763,13 +1783,13 @@ RowsetSharedPtr Tablet::pick_cooldown_rowset() {
bool Tablet::need_cooldown(int64_t* cooldown_timestamp, size_t* file_size) {
// std::shared_lock meta_rlock(_meta_lock);
- if (cooldown_resource().empty()) {
+ if (storage_policy().empty()) {
VLOG_DEBUG << "tablet does not need cooldown, tablet id: " <<
tablet_id();
return false;
}
- auto policy =
ExecEnv::GetInstance()->storage_policy_mgr()->get(cooldown_resource());
+ auto policy =
ExecEnv::GetInstance()->storage_policy_mgr()->get(storage_policy());
if (!policy) {
- LOG(WARNING) << "Cannot get storage policy: " << cooldown_resource();
+ LOG(WARNING) << "Cannot get storage policy: " << storage_policy();
return false;
}
auto cooldown_ttl_sec = policy->cooldown_ttl;
@@ -1817,28 +1837,26 @@ bool Tablet::need_cooldown(int64_t* cooldown_timestamp,
size_t* file_size) {
return false;
}
-void Tablet::remove_all_remote_rowsets() {
- std::unique_lock meta_wlock(_meta_lock);
- DCHECK(_state == TabletState::TABLET_SHUTDOWN);
- Status st;
- for (auto& it : _rs_version_map) {
- auto& rs = it.second;
- if (!rs->is_local()) {
- st = rs->remove();
- LOG_IF(WARNING, !st.ok()) << "Failed to remove rowset " <<
rs->version() << " "
- << rs->rowset_id().to_string() << " in
tablet " << tablet_id()
- << ": " << st.to_string();
- }
- }
- for (auto& it : _stale_rs_version_map) {
- auto& rs = it.second;
- if (!rs->is_local()) {
- st = rs->remove();
- LOG_IF(WARNING, !st.ok()) << "Failed to remove rowset " <<
rs->version() << " "
- << rs->rowset_id().to_string() << " in
tablet " << tablet_id()
- << ": " << st.to_string();
- }
+void Tablet::record_unused_remote_rowset(const RowsetId& rowset_id, const
io::ResourceId& resource,
+ int64_t num_segments) {
+ auto gc_key = REMOTE_ROWSET_GC_PREFIX + rowset_id.to_string();
+ RemoteRowsetGcPB gc_pb;
+ gc_pb.set_resource_id(resource);
+ gc_pb.set_tablet_id(tablet_id());
+ gc_pb.set_num_segments(num_segments);
+ WARN_IF_ERROR(
+ _data_dir->get_meta()->put(META_COLUMN_FAMILY_INDEX, gc_key,
gc_pb.SerializeAsString()),
+ fmt::format("Failed to record unused remote rowset(tablet id: {},
rowset id: {})",
+ tablet_id(), rowset_id.to_string()));
+}
+
+Status Tablet::remove_all_remote_rowsets() {
+ DCHECK(_state == TABLET_SHUTDOWN);
+ if (storage_policy().empty()) {
+ return Status::OK();
}
+ auto tablet_gc_key = REMOTE_TABLET_GC_PREFIX + std::to_string(tablet_id());
+ return _data_dir->get_meta()->put(META_COLUMN_FAMILY_INDEX, tablet_gc_key,
storage_policy());
}
const TabletSchema& Tablet::tablet_schema() const {
@@ -1887,4 +1905,28 @@ Status Tablet::lookup_row_key(const Slice& encoded_key,
RowLocation* row_locatio
return Status::NotFound("can't find key in all rowsets");
}
+void Tablet::remove_self_owned_remote_rowsets() {
+ DCHECK(_state == TABLET_SHUTDOWN);
+ for (const auto& rs : _self_owned_remote_rowsets) {
+ DCHECK(!rs->is_local());
+ record_unused_remote_rowset(rs->rowset_id(),
rs->rowset_meta()->resource_id(),
+ rs->num_segments());
+ }
+}
+
+void Tablet::update_self_owned_remote_rowsets(
+ const std::vector<RowsetSharedPtr>& rowsets_in_snapshot) {
+ if (_self_owned_remote_rowsets.empty()) {
+ return;
+ }
+ for (const auto& rs : rowsets_in_snapshot) {
+ if (!rs->is_local()) {
+ auto it = _self_owned_remote_rowsets.find(rs);
+ if (it != _self_owned_remote_rowsets.end()) {
+ _self_owned_remote_rowsets.erase(it);
+ }
+ }
+ }
+}
+
} // namespace doris
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 08e5fecd15..accc157d6a 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -22,6 +22,7 @@
#include <set>
#include <string>
#include <unordered_map>
+#include <unordered_set>
#include <vector>
#include "gen_cpp/AgentService_types.h"
@@ -30,6 +31,7 @@
#include "olap/base_tablet.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/data_dir.h"
+#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader.h"
@@ -298,14 +300,22 @@ public:
bool need_cooldown(int64_t* cooldown_timestamp, size_t* file_size);
- // Physically remove remote rowsets.
- void remove_all_remote_rowsets();
+ Status remove_all_remote_rowsets();
// Lookup the row location of `encoded_key`, the function sets
`row_location` on success.
// NOTE: the method only works in unique key model with primary key index,
you will got a
// not supported error in other data model.
Status lookup_row_key(const Slice& encoded_key, RowLocation* row_location,
uint32_t version);
+ void remove_self_owned_remote_rowsets();
+
+ // Erase entries in `_self_owned_remote_rowsets` iff they are in
`rowsets_in_snapshot`.
+ // REQUIRES: held _meta_lock
+ void update_self_owned_remote_rowsets(const std::vector<RowsetSharedPtr>&
rowsets_in_snapshot);
+
+ void record_unused_remote_rowset(const RowsetId& rowset_id, const
io::ResourceId& resource,
+ int64_t num_segments);
+
private:
Status _init_once_action();
void _print_missed_versions(const std::vector<Version>& missed_versions)
const;
@@ -397,6 +407,9 @@ private:
int64_t _last_missed_version;
int64_t _last_missed_time_s;
+ // Remote rowsets not shared by other BE. We can delete them when drop
tablet.
+ std::unordered_set<RowsetSharedPtr> _self_owned_remote_rowsets; // guarded
by _meta_lock
+
DISALLOW_COPY_AND_ASSIGN(Tablet);
public:
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 3ab3c3a99d..f5c2ebdbcf 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -194,7 +194,7 @@ Status TabletManager::_add_tablet_to_map_unlocked(TTabletId
tablet_id,
// If the new tablet is fresher than the existing one, then replace
// the existing tablet with the new one.
// Use default replica_id to ignore whether replica_id is match when
drop tablet.
- RETURN_NOT_OK_LOG(_drop_tablet_unlocked(tablet_id, /* replica_id */ 0,
keep_files),
+ RETURN_NOT_OK_LOG(_drop_tablet_unlocked(tablet_id, /* replica_id */ 0,
keep_files, false),
strings::Substitute("failed to drop old tablet when
add new tablet. "
"tablet_id=$0",
tablet_id));
@@ -356,7 +356,7 @@ TabletSharedPtr
TabletManager::_internal_create_tablet_unlocked(
}
// something is wrong, we need clear environment
if (is_tablet_added) {
- Status status = _drop_tablet_unlocked(new_tablet_id,
request.replica_id, false);
+ Status status = _drop_tablet_unlocked(new_tablet_id,
request.replica_id, false, false);
if (!status.ok()) {
LOG(WARNING) << "fail to drop tablet when create tablet failed.
res=" << res;
}
@@ -425,7 +425,8 @@ TabletSharedPtr
TabletManager::_create_tablet_meta_and_dir_unlocked(
return nullptr;
}
-Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id,
bool keep_files) {
+Status TabletManager::drop_tablet(TTabletId tablet_id, TReplicaId replica_id,
+ bool is_drop_table_or_partition) {
auto& shard = _get_tablets_shard(tablet_id);
std::lock_guard wrlock(shard.lock);
if (shard.tablets_under_clone.count(tablet_id) > 0) {
@@ -433,12 +434,12 @@ Status TabletManager::drop_tablet(TTabletId tablet_id,
TReplicaId replica_id, bo
return Status::Aborted("aborted");
}
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
- return _drop_tablet_unlocked(tablet_id, replica_id, keep_files);
+ return _drop_tablet_unlocked(tablet_id, replica_id, false,
is_drop_table_or_partition);
}
// Drop specified tablet.
Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TReplicaId
replica_id,
- bool keep_files) {
+ bool keep_files, bool
is_drop_table_or_partition) {
LOG(INFO) << "begin drop tablet. tablet_id=" << tablet_id << ",
replica_id=" << replica_id;
DorisMetrics::instance()->drop_tablet_requests_total->increment(1);
@@ -472,6 +473,13 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId
tablet_id, TReplicaId repl
// and the tablet will be loaded at restart time.
// To avoid this exception, we first set the state of the tablet to
`SHUTDOWN`.
to_drop_tablet->set_tablet_state(TABLET_SHUTDOWN);
+ // We must record unused remote rowsets path info to OlapMeta before
tablet state is marked as TABLET_SHUTDOWN in OlapMeta,
+ // otherwise if BE shutdown after saving tablet state, these remote
rowsets path info will lost.
+ if (is_drop_table_or_partition) {
+ RETURN_IF_ERROR(to_drop_tablet->remove_all_remote_rowsets());
+ } else {
+ to_drop_tablet->remove_self_owned_remote_rowsets();
+ }
to_drop_tablet->save_meta();
{
std::lock_guard<std::shared_mutex> wrdlock(_shutdown_tablets_lock);
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 7b17b59ce3..159b8baefd 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -61,12 +61,9 @@ public:
// task to be fail, even if there is enough space on other disks
Status create_tablet(const TCreateTabletReq& request,
std::vector<DataDir*> stores);
- // Drop a tablet by description
- // If set keep_files == true, files will NOT be deleted when
deconstruction.
- // Return OLAP_SUCCESS, if run ok
- // OLAP_ERR_TABLE_DELETE_NOEXIST_ERROR, if tablet not exist
- // Status::OLAPInternalError(OLAP_ERR_NOT_INITED), if not inited
- Status drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool
keep_files = false);
+ // Drop a tablet by description.
+ // If `is_drop_table_or_partition` is true, we need to remove all remote
rowsets in this tablet.
+ Status drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool
is_drop_table_or_partition);
Status drop_tablets_on_error_root_path(const std::vector<TabletInfo>&
tablet_info_vec);
@@ -156,7 +153,8 @@ private:
bool _check_tablet_id_exist_unlocked(TTabletId tablet_id);
- Status _drop_tablet_unlocked(TTabletId tablet_id, TReplicaId replica_id,
bool keep_files);
+ Status _drop_tablet_unlocked(TTabletId tablet_id, TReplicaId replica_id,
bool keep_files,
+ bool is_drop_table_or_partition);
TabletSharedPtr _get_tablet_unlocked(TTabletId tablet_id);
TabletSharedPtr _get_tablet_unlocked(TTabletId tablet_id, bool
include_deleted,
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 18dd73f8bc..bc88710854 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -206,7 +206,7 @@ TabletMeta::TabletMeta(const TabletMeta& b)
_del_predicates(b._del_predicates),
_in_restore_mode(b._in_restore_mode),
_preferred_rowset_type(b._preferred_rowset_type),
- _cooldown_resource(b._cooldown_resource),
+ _storage_policy(b._storage_policy),
_delete_bitmap(b._delete_bitmap) {};
void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn&
tcolumn,
@@ -461,7 +461,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB&
tablet_meta_pb) {
_preferred_rowset_type = tablet_meta_pb.preferred_rowset_type();
}
- _cooldown_resource = tablet_meta_pb.storage_policy();
+ _storage_policy = tablet_meta_pb.storage_policy();
if (tablet_meta_pb.has_enable_unique_key_merge_on_write()) {
_enable_unique_key_merge_on_write =
tablet_meta_pb.enable_unique_key_merge_on_write();
}
@@ -528,7 +528,7 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
tablet_meta_pb->set_preferred_rowset_type(_preferred_rowset_type);
}
- tablet_meta_pb->set_storage_policy(_cooldown_resource);
+ tablet_meta_pb->set_storage_policy(_storage_policy);
tablet_meta_pb->set_enable_unique_key_merge_on_write(_enable_unique_key_merge_on_write);
{
@@ -753,7 +753,7 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) {
}
if (a._in_restore_mode != b._in_restore_mode) return false;
if (a._preferred_rowset_type != b._preferred_rowset_type) return false;
- if (a._cooldown_resource != b._cooldown_resource) return false;
+ if (a._storage_policy != b._storage_policy) return false;
return true;
}
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 7275d0430d..34a42ed510 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -26,7 +26,6 @@
#include "common/logging.h"
#include "gen_cpp/olap_file.pb.h"
-#include "io/fs/file_system.h"
#include "olap/delete_handler.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
@@ -186,23 +185,24 @@ public:
bool all_beta() const;
- const io::ResourceId& cooldown_resource() const {
+ const std::string& storage_policy() const {
std::shared_lock<std::shared_mutex> rlock(_meta_lock);
- return _cooldown_resource;
+ return _storage_policy;
}
- void set_cooldown_resource(io::ResourceId resource) {
+ void set_storage_policy(const std::string& policy) {
std::unique_lock<std::shared_mutex> wlock(_meta_lock);
- VLOG_NOTICE << "set tablet_id : " << _table_id << " cooldown resource
from "
- << _cooldown_resource << " to " << resource;
- _cooldown_resource = std::move(resource);
+ VLOG_NOTICE << "set tablet_id : " << _table_id << " storage policy
from " << _storage_policy
+ << " to " << policy;
+ _storage_policy = policy;
}
+
static void init_column_from_tcolumn(uint32_t unique_id, const TColumn&
tcolumn,
ColumnPB* column);
DeleteBitmap& delete_bitmap() { return *_delete_bitmap; }
- bool enable_unique_key_merge_on_write() { return
_enable_unique_key_merge_on_write; }
+ bool enable_unique_key_merge_on_write() const { return
_enable_unique_key_merge_on_write; }
private:
Status _save_meta(DataDir* data_dir);
@@ -238,8 +238,7 @@ private:
bool _in_restore_mode = false;
RowsetTypePB _preferred_rowset_type = BETA_ROWSET;
- // FIXME(cyx): Currently `cooldown_resource` is equivalent to
`storage_policy`.
- io::ResourceId _cooldown_resource;
+ std::string _storage_policy;
// For unique key data model, the feature Merge-on-Write will leverage a
primary
// key index and a delete-bitmap to mark duplicate keys as deleted in load
stage,
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index 221be83850..73466cb72e 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -249,7 +249,7 @@ void EngineCloneTask::_set_tablet_info(Status status, bool
is_new_tablet) {
<< ", signature:" << _signature << ", version:"
<< tablet_info.version
<< ", expected_version: " <<
_clone_req.committed_version;
Status drop_status =
StorageEngine::instance()->tablet_manager()->drop_tablet(
- _clone_req.tablet_id, _clone_req.replica_id);
+ _clone_req.tablet_id, _clone_req.replica_id, false);
if (drop_status != Status::OK() &&
drop_status.precise_code() != OLAP_ERR_TABLE_NOT_FOUND) {
// just log
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index d05fe0a976..f503e3b4dc 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -197,6 +197,7 @@ set(OLAP_TEST_FILES
# olap/push_handler_test.cpp
olap/tablet_cooldown_test.cpp
olap/rowid_conversion_test.cpp
+ olap/remote_rowset_gc_test.cpp
)
set(RUNTIME_TEST_FILES
diff --git a/be/test/olap/delete_handler_test.cpp
b/be/test/olap/delete_handler_test.cpp
index b383fbc3a7..7ba23ca2c1 100644
--- a/be/test/olap/delete_handler_test.cpp
+++ b/be/test/olap/delete_handler_test.cpp
@@ -278,7 +278,7 @@ protected:
tablet.reset();
dup_tablet.reset();
StorageEngine::instance()->tablet_manager()->drop_tablet(_create_tablet.tablet_id,
-
_create_tablet.replica_id);
+
_create_tablet.replica_id, false);
EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
}
@@ -443,8 +443,8 @@ protected:
void TearDown() {
// Remove all dir.
tablet.reset();
- k_engine->tablet_manager()->drop_tablet(_create_tablet.tablet_id,
- _create_tablet.replica_id);
+ k_engine->tablet_manager()->drop_tablet(_create_tablet.tablet_id,
_create_tablet.replica_id,
+ false);
EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
}
@@ -820,7 +820,7 @@ protected:
tablet.reset();
_delete_handler.finalize();
StorageEngine::instance()->tablet_manager()->drop_tablet(_create_tablet.tablet_id,
-
_create_tablet.replica_id);
+
_create_tablet.replica_id, false);
EXPECT_TRUE(FileUtils::remove_all(config::storage_root_path).ok());
}
diff --git a/be/test/olap/delta_writer_test.cpp
b/be/test/olap/delta_writer_test.cpp
index 3126b30f94..eae41cb320 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -406,7 +406,7 @@ TEST_F(TestDeltaWriter, open) {
EXPECT_EQ(Status::OK(), res);
SAFE_DELETE(delta_writer);
- res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id);
+ res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id, false);
EXPECT_EQ(Status::OK(), res);
}
@@ -527,7 +527,7 @@ TEST_F(TestDeltaWriter, write) {
}
EXPECT_EQ(1, tablet->num_rows());
- res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id);
+ res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id, false);
EXPECT_EQ(Status::OK(), res);
delete delta_writer;
}
@@ -673,7 +673,7 @@ TEST_F(TestDeltaWriter, vec_write) {
}
ASSERT_EQ(1, tablet->num_rows());
- res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id);
+ res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id, false);
ASSERT_TRUE(res.ok());
delete delta_writer;
}
@@ -740,7 +740,7 @@ TEST_F(TestDeltaWriter, sequence_col) {
}
EXPECT_EQ(1, tablet->num_rows());
- res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id);
+ res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id, false);
EXPECT_EQ(Status::OK(), res);
delete delta_writer;
}
@@ -833,7 +833,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
}
ASSERT_EQ(1, tablet->num_rows());
- res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id);
+ res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id, false);
ASSERT_TRUE(res.ok());
delete delta_writer;
}
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp
b/be/test/olap/engine_storage_migration_task_test.cpp
index b5b3b4d988..a36f0ff0c4 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -259,7 +259,7 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration)
{
EXPECT_NE(tablet3, tablet);
// test case 2 end
- res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id);
+ res = k_engine->tablet_manager()->drop_tablet(request.tablet_id,
request.replica_id, false);
EXPECT_EQ(Status::OK(), res);
delete delta_writer;
}
diff --git a/be/test/olap/tablet_clone_test.cpp
b/be/test/olap/remote_rowset_gc_test.cpp
similarity index 73%
rename from be/test/olap/tablet_clone_test.cpp
rename to be/test/olap/remote_rowset_gc_test.cpp
index 51124f87cf..c02b15c1c4 100644
--- a/be/test/olap/tablet_clone_test.cpp
+++ b/be/test/olap/remote_rowset_gc_test.cpp
@@ -19,11 +19,12 @@
#include <memory>
+#include "common/config.h"
#include "common/status.h"
#include "io/fs/file_system_map.h"
#include "io/fs/s3_file_system.h"
#include "olap/delta_writer.h"
-#include "olap/snapshot_manager.h"
+#include "olap/rowset/beta_rowset.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "runtime/descriptor_helper.h"
@@ -35,47 +36,39 @@ namespace doris {
static StorageEngine* k_engine = nullptr;
-static const std::string kTestDir = "./ut_dir/tablet_clone_test";
-static std::string kSnapshotDir = "./ut_dir/tablet_clone_test/snapshot";
-static const std::string kResourceId = "TabletCloneTest";
-static const int64_t kTabletId = 10005;
-static const int32_t KSchemaHash = 270068377;
-
-static const std::string AK = "ak";
-static const std::string SK = "sk";
-static const std::string ENDPOINT = "endpoint";
-static const std::string REGION = "region";
-static const std::string BUCKET = "bucket";
-static const std::string PREFIX = "prefix";
+static const std::string kTestDir = "./ut_dir/remote_rowset_gc_test";
+static const std::string kResourceId = "RemoteRowsetGcTest";
// remove DISABLED_ when need run this test
-#define TabletCloneTest DISABLED_TabletCloneTest
-#define private public
-class TabletCloneTest : public testing::Test {
+#define RemoteRowsetGcTest DISABLED_RemoteRowsetGcTest
+class RemoteRowsetGcTest : public testing::Test {
public:
static void SetUpTestSuite() {
S3Conf s3_conf;
- s3_conf.ak = AK;
- s3_conf.sk = SK;
- s3_conf.endpoint = ENDPOINT;
- s3_conf.region = REGION;
- s3_conf.bucket = BUCKET;
- s3_conf.prefix = PREFIX;
+ s3_conf.ak = config::test_s3_ak;
+ s3_conf.sk = config::test_s3_sk;
+ s3_conf.endpoint = config::test_s3_endpoint;
+ s3_conf.region = config::test_s3_region;
+ s3_conf.bucket = config::test_s3_bucket;
+ s3_conf.prefix = "remote_rowset_gc_test";
auto s3_fs = std::make_shared<io::S3FileSystem>(std::move(s3_conf),
kResourceId);
ASSERT_TRUE(s3_fs->connect().ok());
io::FileSystemMap::instance()->insert(kResourceId, s3_fs);
- config::storage_root_path = kTestDir;
+ constexpr uint32_t MAX_PATH_LEN = 1024;
+ char buffer[MAX_PATH_LEN];
+ EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
+ config::storage_root_path = std::string(buffer) + "/" + kTestDir;
config::min_file_descriptor_number = 1000;
- FileUtils::remove_all(kTestDir);
- FileUtils::create_dir(kTestDir);
- std::vector<StorePath> paths {{kTestDir, -1}};
+ FileUtils::remove_all(config::storage_root_path);
+ FileUtils::create_dir(config::storage_root_path);
+
+ std::vector<StorePath> paths {{config::storage_root_path, -1}};
EngineOptions options;
options.store_paths = paths;
doris::StorageEngine::open(options, &k_engine);
- k_engine->start_bg_threads();
}
static void TearDownTestSuite() {
@@ -145,9 +138,9 @@ static TDescriptorTable
create_descriptor_tablet_with_sequence_col() {
return desc_tbl_builder.desc_tbl();
}
-TEST_F(TabletCloneTest, convert_rowset_ids_has_file_in_s3) {
+TEST_F(RemoteRowsetGcTest, normal) {
TCreateTabletReq request;
- create_tablet_request_with_sequence_col(kTabletId, KSchemaHash, &request);
+ create_tablet_request_with_sequence_col(10005, 270068377, &request);
Status st = k_engine->create_tablet(request);
ASSERT_EQ(Status::OK(), st);
@@ -161,13 +154,14 @@ TEST_F(TabletCloneTest,
convert_rowset_ids_has_file_in_s3) {
PUniqueId load_id;
load_id.set_hi(0);
load_id.set_lo(0);
- WriteRequest write_req = {kTabletId, KSchemaHash, WriteType::LOAD, 20003,
- 30003, load_id, tuple_desc,
&(tuple_desc->slots())};
+ WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003,
+ 30003, load_id, tuple_desc,
&(tuple_desc->slots())};
DeltaWriter* delta_writer = nullptr;
DeltaWriter::open(&write_req, &delta_writer);
ASSERT_NE(delta_writer, nullptr);
- MemPool pool;
+ MemTracker tracker;
+ MemPool pool(&tracker);
// Tuple 1
{
Tuple* tuple =
reinterpret_cast<Tuple*>(pool.allocate(tuple_desc->byte_size()));
@@ -199,27 +193,39 @@ TEST_F(TabletCloneTest,
convert_rowset_ids_has_file_in_s3) {
write_req.txn_id, write_req.partition_id, &tablet_related_rs);
for (auto& tablet_rs : tablet_related_rs) {
RowsetSharedPtr rowset = tablet_rs.second;
- rowset->rowset_meta()->set_resource_id(kResourceId);
st = k_engine->txn_manager()->publish_txn(meta,
write_req.partition_id, write_req.txn_id,
- tablet->tablet_id(),
tablet->schema_hash(),
- tablet->tablet_uid(),
version);
+ write_req.tablet_id,
write_req.schema_hash,
+ tablet_rs.first.tablet_uid,
version);
ASSERT_EQ(Status::OK(), st);
st = tablet->add_inc_rowset(rowset);
ASSERT_EQ(Status::OK(), st);
}
EXPECT_EQ(1, tablet->num_rows());
- TSnapshotRequest snapshot_req;
- snapshot_req.tablet_id = kTabletId;
- snapshot_req.schema_hash = KSchemaHash;
- bool allow_incremental_clone = false;
- st = SnapshotManager::instance()->_create_snapshot_files(tablet,
snapshot_req, &kSnapshotDir,
-
&allow_incremental_clone);
+ tablet->set_storage_policy(kResourceId);
+ st = tablet->cooldown(); // rowset [0-1]
+ ASSERT_EQ(Status::OK(), st);
+ st = tablet->cooldown(); // rowset [2-2]
ASSERT_EQ(Status::OK(), st);
- st = SnapshotManager::instance()->convert_rowset_ids(kTestDir, kTabletId,
request.replica_id,
- KSchemaHash);
- ASSERT_NE(Status::OK(), st);
+ ASSERT_EQ(DorisMetrics::instance()->upload_rowset_count->value(), 1);
+
delete delta_writer;
+
+ auto fs = io::FileSystemMap::instance()->get(kResourceId);
+ auto rowset = tablet->get_rowset_by_version({2, 2});
+ ASSERT_TRUE(rowset);
+ auto seg_path = BetaRowset::remote_segment_path(10005,
rowset->rowset_id(), 0);
+ bool exists = false;
+ st = fs->exists(seg_path, &exists);
+ ASSERT_EQ(Status::OK(), st);
+ ASSERT_TRUE(exists);
+
+ st = k_engine->tablet_manager()->drop_tablet(10005, 0, true);
+ ASSERT_EQ(Status::OK(), st);
+ tablet->data_dir()->perform_remote_tablet_gc();
+ st = fs->exists(seg_path, &exists);
+ ASSERT_EQ(Status::OK(), st);
+ ASSERT_FALSE(exists);
}
} // namespace doris
diff --git a/be/test/olap/tablet_cooldown_test.cpp
b/be/test/olap/tablet_cooldown_test.cpp
index f20eccd376..eae932e6f0 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -25,7 +25,6 @@
#include "io/fs/s3_file_system.h"
#include "olap/delta_writer.h"
#include "olap/storage_engine.h"
-#include "olap/storage_policy_mgr.h"
#include "olap/tablet.h"
#include "runtime/descriptor_helper.h"
#include "runtime/tuple.h"
@@ -39,25 +38,18 @@ static StorageEngine* k_engine = nullptr;
static const std::string kTestDir = "./ut_dir/tablet_cooldown_test";
static const std::string kResourceId = "TabletCooldownTest";
-static const std::string AK = "ak";
-static const std::string SK = "sk";
-static const std::string ENDPOINT = "endpoint";
-static const std::string REGION = "region";
-static const std::string BUCKET = "bucket";
-static const std::string PREFIX = "tablet_cooldown_test";
-
// remove DISABLED_ when need run this test
#define TabletCooldownTest DISABLED_TabletCooldownTest
class TabletCooldownTest : public testing::Test {
public:
static void SetUpTestSuite() {
S3Conf s3_conf;
- s3_conf.ak = AK;
- s3_conf.sk = SK;
- s3_conf.endpoint = ENDPOINT;
- s3_conf.region = REGION;
- s3_conf.bucket = BUCKET;
- s3_conf.prefix = PREFIX;
+ s3_conf.ak = config::test_s3_ak;
+ s3_conf.sk = config::test_s3_sk;
+ s3_conf.endpoint = config::test_s3_endpoint;
+ s3_conf.region = config::test_s3_region;
+ s3_conf.bucket = config::test_s3_bucket;
+ s3_conf.prefix = "tablet_cooldown_test";
auto s3_fs = std::make_shared<io::S3FileSystem>(std::move(s3_conf),
kResourceId);
ASSERT_TRUE(s3_fs->connect().ok());
io::FileSystemMap::instance()->insert(kResourceId, s3_fs);
@@ -167,7 +159,8 @@ TEST_F(TabletCooldownTest, normal) {
DeltaWriter::open(&write_req, &delta_writer);
ASSERT_NE(delta_writer, nullptr);
- MemPool pool;
+ MemTracker tracker;
+ MemPool pool(&tracker);
// Tuple 1
{
Tuple* tuple =
reinterpret_cast<Tuple*>(pool.allocate(tuple_desc->byte_size()));
@@ -208,7 +201,7 @@ TEST_F(TabletCooldownTest, normal) {
}
EXPECT_EQ(1, tablet->num_rows());
- tablet->set_cooldown_resource(kResourceId);
+ tablet->set_storage_policy(kResourceId);
st = tablet->cooldown(); // rowset [0-1]
ASSERT_EQ(Status::OK(), st);
st = tablet->cooldown(); // rowset [2-2]
diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp
index a47c1ddc95..12838e9b22 100644
--- a/be/test/olap/tablet_mgr_test.cpp
+++ b/be/test/olap/tablet_mgr_test.cpp
@@ -117,7 +117,7 @@ TEST_F(TabletMgrTest, CreateTablet) {
create_st = _tablet_mgr->create_tablet(create_tablet_req, data_dirs);
EXPECT_TRUE(create_st == Status::OK());
- Status drop_st = _tablet_mgr->drop_tablet(111,
create_tablet_req.replica_id);
+ Status drop_st = _tablet_mgr->drop_tablet(111,
create_tablet_req.replica_id, false);
EXPECT_TRUE(drop_st == Status::OK());
tablet.reset();
Status trash_st = _tablet_mgr->start_trash_sweep();
@@ -172,7 +172,7 @@ TEST_F(TabletMgrTest, CreateTabletWithSequence) {
Status check_meta_st = TabletMetaManager::get_meta(_data_dir, 111, 3333,
new_tablet_meta);
EXPECT_TRUE(check_meta_st == Status::OK());
- Status drop_st = _tablet_mgr->drop_tablet(111,
create_tablet_req.replica_id);
+ Status drop_st = _tablet_mgr->drop_tablet(111,
create_tablet_req.replica_id, false);
EXPECT_TRUE(drop_st == Status::OK());
tablet.reset();
Status trash_st = _tablet_mgr->start_trash_sweep();
@@ -206,13 +206,13 @@ TEST_F(TabletMgrTest, DropTablet) {
EXPECT_TRUE(tablet != nullptr);
// drop unexist tablet will be success
- Status drop_st = _tablet_mgr->drop_tablet(1121,
create_tablet_req.replica_id);
+ Status drop_st = _tablet_mgr->drop_tablet(1121,
create_tablet_req.replica_id, false);
EXPECT_TRUE(drop_st == Status::OK());
tablet = _tablet_mgr->get_tablet(111);
EXPECT_TRUE(tablet != nullptr);
// drop exist tablet will be success
- drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id);
+ drop_st = _tablet_mgr->drop_tablet(111, create_tablet_req.replica_id,
false);
EXPECT_TRUE(drop_st == Status::OK());
tablet = _tablet_mgr->get_tablet(111);
EXPECT_TRUE(tablet == nullptr);
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index 2ca5d4450d..077190c3c1 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -281,7 +281,7 @@ TEST_F(TestTablet, cooldown_policy) {
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr));
_tablet->init();
- _tablet->set_cooldown_resource("test_policy_name");
+ _tablet->set_storage_policy("test_policy_name");
_tablet->_rs_version_map[ptr1->version()] = rowset1;
_tablet->_rs_version_map[ptr2->version()] = rowset2;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
index 85136dd17a..47befaaccb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
@@ -283,10 +283,12 @@ public class BackendLoadStatistic {
// Else if capacity used percent > 75%, set capacityCoefficient to 1.
// Else, capacityCoefficient changed smoothly from 0.5 to 1 with used
capacity increasing
// Function: (2 * usedCapacityPercent - 0.5)
- loadScore.capacityCoefficient = usedCapacityPercent < 0.5 ? 0.5
- : (usedCapacityPercent >
Config.capacity_used_percent_high_water ? 1.0
- : (2 * usedCapacityPercent - 0.5));
- loadScore.replicaNumCoefficient = 1 - loadScore.capacityCoefficient;
+ if (!Config.be_rebalancer_fuzzy_test) {
+ loadScore.capacityCoefficient = usedCapacityPercent < 0.5 ? 0.5
+ : (usedCapacityPercent >
Config.capacity_used_percent_high_water ? 1.0
+ : (2 * usedCapacityPercent - 0.5));
+ loadScore.replicaNumCoefficient = 1 -
loadScore.capacityCoefficient;
+ }
loadScore.score = capacityProportion * loadScore.capacityCoefficient
+ replicaNumProportion * loadScore.replicaNumCoefficient;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 26f180ea28..db79c6cbdb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1685,7 +1685,7 @@ public class Config extends ConfigBase {
* It's used to test the reliability in single replica case when tablet
scheduling are frequent.
* Default is false.
*/
- @ConfField(mutable = false, masterOnly = true)
+ @ConfField(mutable = true, masterOnly = true)
public static boolean be_rebalancer_fuzzy_test = false;
/**
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index c1ecfac30d..1bea2eb071 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -115,6 +115,13 @@ message RowsetMetaPB {
optional SegmentsOverlapPB segments_overlap_pb = 51 [default =
OVERLAP_UNKNOWN];
}
+// unused remote rowsets garbage collection kv value
+message RemoteRowsetGcPB {
+ required string resource_id = 1;
+ required int64 tablet_id = 2;
+ required int64 num_segments = 3;
+}
+
message AlphaRowsetExtraMetaPB {
repeated SegmentGroupPB segment_groups = 1;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]