This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 96711ec2344 branch-3.1: [opt](cloud) Reduce empty rowset pressure on
meta service #54395 #55171 #55604 #55742 #55837 (#55934)
96711ec2344 is described below
commit 96711ec234433a9d187b16b7a3f04c95beee1baa
Author: Xin Liao <[email protected]>
AuthorDate: Mon Sep 15 11:58:17 2025 +0800
branch-3.1: [opt](cloud) Reduce empty rowset pressure on meta service
#54395 #55171 #55604 #55742 #55837 (#55934)
pick from:#54395 #55171 #55604 #55742 #55837
---
be/src/cloud/cloud_base_compaction.cpp | 2 +-
be/src/cloud/cloud_cumulative_compaction.cpp | 2 +-
be/src/cloud/cloud_delta_writer.cpp | 27 +-
be/src/cloud/cloud_delta_writer.h | 3 +
be/src/cloud/cloud_full_compaction.cpp | 2 +-
be/src/cloud/cloud_meta_mgr.cpp | 124 ++++-
be/src/cloud/cloud_meta_mgr.h | 11 +
be/src/cloud/cloud_rowset_builder.cpp | 4 +-
be/src/cloud/cloud_rowset_builder.h | 6 +
be/src/cloud/cloud_schema_change_job.cpp | 1 +
be/src/cloud/cloud_tablet.cpp | 9 +-
be/src/cloud/cloud_tablets_channel.cpp | 7 +-
be/src/cloud/config.cpp | 2 +
be/src/cloud/config.h | 3 +
be/src/common/config.cpp | 2 +
be/src/olap/compaction.cpp | 55 ++-
be/src/olap/compaction.h | 4 +
be/src/olap/rowset/rowset.h | 10 +
be/src/olap/rowset/rowset_meta.h | 4 +
be/test/cloud/cloud_compaction_test.cpp | 190 +++++++-
be/test/cloud/cloud_meta_mgr_test.cpp | 536 +++++++++++++++++++++
cloud/src/meta-service/meta_service.cpp | 2 +-
cloud/src/meta-service/meta_service_job.cpp | 4 +-
cloud/src/recycler/recycler.cpp | 28 +-
cloud/test/recycler_test.cpp | 1 +
gensrc/proto/cloud.proto | 4 +-
.../test_compaction_with_empty_rowset.out | Bin 0 -> 349 bytes
.../test_schema_change_mow_with_empty_rowset.out | Bin 0 -> 5653 bytes
.../pipeline/cloud_p0/conf/be_custom.conf | 2 +-
.../test_compaction_with_empty_rowset.groovy | 101 ++++
...test_schema_change_mow_with_empty_rowset.groovy | 80 +++
31 files changed, 1186 insertions(+), 40 deletions(-)
diff --git a/be/src/cloud/cloud_base_compaction.cpp
b/be/src/cloud/cloud_base_compaction.cpp
index 5e00721bfc7..bd2d07d41b9 100644
--- a/be/src/cloud/cloud_base_compaction.cpp
+++ b/be/src/cloud/cloud_base_compaction.cpp
@@ -344,7 +344,7 @@ Status CloudBaseCompaction::modify_rowsets() {
compaction_job->set_size_output_rowsets(_output_rowset->total_disk_size());
compaction_job->set_num_input_segments(_input_segments);
compaction_job->set_num_output_segments(_output_rowset->num_segments());
- compaction_job->set_num_input_rowsets(_input_rowsets.size());
+ compaction_job->set_num_input_rowsets(num_input_rowsets());
compaction_job->set_num_output_rowsets(1);
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp
b/be/src/cloud/cloud_cumulative_compaction.cpp
index d2ea6ac0e8a..102129bdd89 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -270,7 +270,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->set_size_output_rowsets(_output_rowset->total_disk_size());
compaction_job->set_num_input_segments(_input_segments);
compaction_job->set_num_output_segments(_output_rowset->num_segments());
- compaction_job->set_num_input_rowsets(_input_rowsets.size());
+ compaction_job->set_num_input_rowsets(num_input_rowsets());
compaction_job->set_num_output_rowsets(1);
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
diff --git a/be/src/cloud/cloud_delta_writer.cpp
b/be/src/cloud/cloud_delta_writer.cpp
index 2533028314f..e0f9d203750 100644
--- a/be/src/cloud/cloud_delta_writer.cpp
+++ b/be/src/cloud/cloud_delta_writer.cpp
@@ -20,6 +20,7 @@
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_rowset_builder.h"
#include "cloud/cloud_storage_engine.h"
+#include "cloud/config.h"
#include "olap/delta_writer.h"
#include "runtime/thread_context.h"
@@ -108,11 +109,31 @@ const RowsetMetaSharedPtr&
CloudDeltaWriter::rowset_meta() {
Status CloudDeltaWriter::commit_rowset() {
std::lock_guard<bthread::Mutex> lock(_mtx);
+
+ // Handle empty rowset (no data written)
if (!_is_init) {
- // No data to write, but still need to write a empty rowset kv to keep
version continuous
- RETURN_IF_ERROR(_rowset_builder->init());
- RETURN_IF_ERROR(_rowset_builder->build_rowset());
+ return _commit_empty_rowset();
+ }
+
+ // Handle normal rowset with data
+ return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
+}
+
+Status CloudDeltaWriter::_commit_empty_rowset() {
+ // If skip writing empty rowset metadata is enabled,
+ // we do not prepare rowset to meta service.
+ if (config::skip_writing_empty_rowset_metadata) {
+ rowset_builder()->set_skip_writing_rowset_metadata(true);
+ }
+
+ RETURN_IF_ERROR(_rowset_builder->init());
+ RETURN_IF_ERROR(_rowset_builder->build_rowset());
+
+ // If skip writing empty rowset metadata is enabled, we do not commit
rowset to meta service.
+ if (config::skip_writing_empty_rowset_metadata) {
+ return Status::OK();
}
+ // write a empty rowset kv to keep version continuous
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
}
diff --git a/be/src/cloud/cloud_delta_writer.h
b/be/src/cloud/cloud_delta_writer.h
index 4558b04acd1..005be63feb2 100644
--- a/be/src/cloud/cloud_delta_writer.h
+++ b/be/src/cloud/cloud_delta_writer.h
@@ -58,6 +58,9 @@ private:
// Convert `_rowset_builder` from `BaseRowsetBuilder` to
`CloudRowsetBuilder`
CloudRowsetBuilder* rowset_builder();
+ // Handle commit for empty rowset (when no data is written)
+ Status _commit_empty_rowset();
+
bthread::Mutex _mtx;
CloudStorageEngine& _engine;
QueryThreadContext _query_thread_context;
diff --git a/be/src/cloud/cloud_full_compaction.cpp
b/be/src/cloud/cloud_full_compaction.cpp
index ac60c869154..d0d756cb7ea 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -235,7 +235,7 @@ Status CloudFullCompaction::modify_rowsets() {
})
compaction_job->set_num_input_segments(_input_segments);
compaction_job->set_num_output_segments(_output_rowset->num_segments());
- compaction_job->set_num_input_rowsets(_input_rowsets.size());
+ compaction_job->set_num_input_rowsets(num_input_rowsets());
compaction_job->set_num_output_rowsets(1);
compaction_job->add_input_versions(_input_rowsets.front()->start_version());
compaction_job->add_input_versions(_input_rowsets.back()->end_version());
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index fbaf2daf9a3..1364cab47fb 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -160,6 +160,7 @@ bvar::Window<bvar::Adder<uint64_t>>
g_cloud_ms_rpc_timeout_count_window(
"cloud_meta_mgr_rpc_timeout_qps", &g_cloud_meta_mgr_rpc_timeout_count,
30);
bvar::LatencyRecorder g_cloud_be_mow_get_dbm_lock_backoff_sleep_time(
"cloud_be_mow_get_dbm_lock_backoff_sleep_time");
+bvar::Adder<uint64_t>
g_cloud_version_hole_filled_count("cloud_version_hole_filled_count");
class MetaServiceProxy {
public:
@@ -768,6 +769,12 @@ Status
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
tablet->add_rowsets(std::move(rowsets), version_overlap,
wlock, warmup_delta_data);
RETURN_IF_ERROR(tablet->merge_rowsets_schema());
}
+
+ // Fill version holes
+ int64_t partition_max_version =
+ resp.has_partition_max_version() ?
resp.partition_max_version() : -1;
+ RETURN_IF_ERROR(fill_version_holes(tablet, partition_max_version,
wlock));
+
tablet->last_base_compaction_success_time_ms =
stats.last_base_compaction_time_ms();
tablet->last_cumu_compaction_success_time_ms =
stats.last_cumu_compaction_time_ms();
tablet->set_base_compaction_cnt(stats.base_compaction_cnt());
@@ -1675,5 +1682,120 @@ Status CloudMetaMgr::get_schema_dict(int64_t index_id,
return Status::OK();
}
-#include "common/compile_check_end.h"
+Status CloudMetaMgr::fill_version_holes(CloudTablet* tablet, int64_t
max_version,
+ std::unique_lock<std::shared_mutex>&
wlock) {
+ if (max_version <= 0) {
+ return Status::OK();
+ }
+
+ Versions existing_versions;
+ for (const auto& rs : tablet->tablet_meta()->all_rs_metas()) {
+ existing_versions.emplace_back(rs->version());
+ }
+
+ // If there are no existing versions, it may be a new tablet for restore,
so skip filling holes.
+ if (existing_versions.empty()) {
+ return Status::OK();
+ }
+
+ std::vector<RowsetSharedPtr> hole_rowsets;
+ // sort the existing versions in ascending order
+ std::sort(existing_versions.begin(), existing_versions.end(),
+ [](const Version& a, const Version& b) {
+ // simple because 2 versions are certainly not overlapping
+ return a.first < b.first;
+ });
+
+ int64_t last_version = -1;
+ for (const Version& version : existing_versions) {
+ // missing versions are those that are not in the existing_versions
+ if (version.first > last_version + 1) {
+ // there is a hole between versions
+ auto prev_non_hole_rowset = tablet->get_rowset_by_version(version);
+ for (int64_t ver = last_version + 1; ver < version.first; ++ver) {
+ RowsetSharedPtr hole_rowset;
+ RETURN_IF_ERROR(create_empty_rowset_for_hole(
+ tablet, ver, prev_non_hole_rowset->rowset_meta(),
&hole_rowset));
+ hole_rowsets.push_back(hole_rowset);
+ }
+ LOG(INFO) << "Created empty rowset for version hole, from " <<
last_version + 1
+ << " to " << version.first - 1 << " for tablet " <<
tablet->tablet_id();
+ }
+ last_version = version.second;
+ }
+
+ if (last_version + 1 <= max_version) {
+ LOG(INFO) << "Created empty rowset for version hole, from " <<
last_version + 1 << " to "
+ << max_version << " for tablet " << tablet->tablet_id();
+ for (; last_version + 1 <= max_version; ++last_version) {
+ RowsetSharedPtr hole_rowset;
+ auto prev_non_hole_rowset =
tablet->get_rowset_by_version(existing_versions.back());
+ RETURN_IF_ERROR(create_empty_rowset_for_hole(
+ tablet, last_version + 1,
prev_non_hole_rowset->rowset_meta(), &hole_rowset));
+ hole_rowsets.push_back(hole_rowset);
+ }
+ }
+
+ if (!hole_rowsets.empty()) {
+ size_t hole_count = hole_rowsets.size();
+ tablet->add_rowsets(std::move(hole_rowsets), false, wlock, false);
+ g_cloud_version_hole_filled_count << hole_count;
+ }
+ return Status::OK();
+}
+
+Status CloudMetaMgr::create_empty_rowset_for_hole(CloudTablet* tablet, int64_t
version,
+ RowsetMetaSharedPtr
prev_rowset_meta,
+ RowsetSharedPtr* rowset) {
+ // Create a RowsetMeta for the empty rowset
+ auto rs_meta = std::make_shared<RowsetMeta>();
+
+ // Generate a deterministic rowset ID for the hole (same tablet_id +
version = same rowset_id)
+ RowsetId hole_rowset_id;
+ hole_rowset_id.init(2, 0, tablet->tablet_id(), version);
+ rs_meta->set_rowset_id(hole_rowset_id);
+
+ // Generate a deterministic load_id for the hole rowset (same tablet_id +
version = same load_id)
+ PUniqueId load_id;
+ load_id.set_hi(tablet->tablet_id());
+ load_id.set_lo(version);
+ rs_meta->set_load_id(load_id);
+
+ // Copy schema and other metadata from template
+ rs_meta->set_tablet_schema(prev_rowset_meta->tablet_schema());
+ rs_meta->set_rowset_type(prev_rowset_meta->rowset_type());
+ rs_meta->set_tablet_schema_hash(prev_rowset_meta->tablet_schema_hash());
+ rs_meta->set_resource_id(prev_rowset_meta->resource_id());
+
+ // Basic tablet information
+ rs_meta->set_tablet_id(tablet->tablet_id());
+ rs_meta->set_index_id(tablet->index_id());
+ rs_meta->set_partition_id(tablet->partition_id());
+ rs_meta->set_tablet_uid(tablet->tablet_uid());
+ rs_meta->set_version(Version(version, version));
+ rs_meta->set_txn_id(version);
+
+ rs_meta->set_num_rows(0);
+ rs_meta->set_total_disk_size(0);
+ rs_meta->set_data_disk_size(0);
+ rs_meta->set_index_disk_size(0);
+ rs_meta->set_empty(true);
+ rs_meta->set_num_segments(0);
+ rs_meta->set_segments_overlap(NONOVERLAPPING);
+ rs_meta->set_rowset_state(VISIBLE);
+ rs_meta->set_creation_time(UnixSeconds());
+ rs_meta->set_newest_write_timestamp(UnixSeconds());
+
+ Status s = RowsetFactory::create_rowset(nullptr, "", rs_meta, rowset);
+ if (!s.ok()) {
+ LOG_WARNING("Failed to create empty rowset for hole")
+ .tag("tablet_id", tablet->tablet_id())
+ .tag("version", version)
+ .error(s);
+ return s;
+ }
+ (*rowset)->set_hole_rowset(true);
+
+ return Status::OK();
+}
} // namespace doris::cloud
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index 2794a0a815e..bd3acbf8215 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -26,6 +26,7 @@
#include <vector>
#include "common/status.h"
+#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/rowset_meta.h"
#include "util/s3_util.h"
@@ -151,6 +152,15 @@ public:
void remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id,
int64_t initiator,
int64_t tablet_id);
+ // Fill version holes by creating empty rowsets for missing versions
+ Status fill_version_holes(CloudTablet* tablet, int64_t max_version,
+ std::unique_lock<std::shared_mutex>& wlock);
+
+ // Create an empty rowset to fill a version hole
+ Status create_empty_rowset_for_hole(CloudTablet* tablet, int64_t version,
+ RowsetMetaSharedPtr prev_rowset_meta,
+ RowsetSharedPtr* rowset);
+
private:
bool sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t
old_max_version,
std::ranges::range auto&& rs_metas,
@@ -160,6 +170,7 @@ private:
std::ranges::range auto&& rs_metas, const
TabletStatsPB& stats,
const TabletIndexPB& idx, DeleteBitmap*
delete_bitmap,
bool full_sync = false, SyncRowsetStats*
sync_stats = nullptr);
+
void check_table_size_correctness(const RowsetMeta& rs_meta);
int64_t get_segment_file_size(const RowsetMeta& rs_meta);
int64_t get_inverted_index_file_szie(const RowsetMeta& rs_meta);
diff --git a/be/src/cloud/cloud_rowset_builder.cpp
b/be/src/cloud/cloud_rowset_builder.cpp
index 3da6a55aa44..a8c0ad87980 100644
--- a/be/src/cloud/cloud_rowset_builder.cpp
+++ b/be/src/cloud/cloud_rowset_builder.cpp
@@ -80,7 +80,9 @@ Status CloudRowsetBuilder::init() {
_calc_delete_bitmap_token =
_engine.calc_delete_bitmap_executor()->create_token();
-
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta(),
""));
+ if (!_skip_writing_rowset_metadata) {
+
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta(),
""));
+ }
_is_init = true;
return Status::OK();
diff --git a/be/src/cloud/cloud_rowset_builder.h
b/be/src/cloud/cloud_rowset_builder.h
index 05e24e66382..afa5d7c7574 100644
--- a/be/src/cloud/cloud_rowset_builder.h
+++ b/be/src/cloud/cloud_rowset_builder.h
@@ -39,6 +39,8 @@ public:
Status set_txn_related_delete_bitmap();
+ void set_skip_writing_rowset_metadata(bool skip) {
_skip_writing_rowset_metadata = skip; }
+
private:
// Convert `_tablet` from `BaseTablet` to `CloudTablet`
CloudTablet* cloud_tablet();
@@ -46,6 +48,10 @@ private:
Status check_tablet_version_count();
CloudStorageEngine& _engine;
+
+ // whether to skip writing rowset metadata to meta service.
+ // This is used for empty rowset when
config::skip_writing_empty_rowset_metadata is true.
+ bool _skip_writing_rowset_metadata = false;
};
} // namespace doris
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index 9e8d3f85a08..ec4c8224050 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -459,6 +459,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t
alter_version,
RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet,
initiator));
TabletMetaSharedPtr tmp_meta =
std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
tmp_meta->delete_bitmap()->delete_bitmap.clear();
+ tmp_meta->clear_rowsets();
std::shared_ptr<CloudTablet> tmp_tablet =
std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
{
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 23232fb1563..d38ae02ae25 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -661,7 +661,6 @@ std::vector<RecycledRowsets>
CloudTablet::recycle_cached_data(
void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t
num_segments,
int64_t num_rows, int64_t data_size)
{
- _approximate_num_rowsets.store(num_rowsets, std::memory_order_relaxed);
_approximate_num_segments.store(num_segments, std::memory_order_relaxed);
_approximate_num_rows.store(num_rows, std::memory_order_relaxed);
_approximate_data_size.store(data_size, std::memory_order_relaxed);
@@ -672,10 +671,16 @@ void CloudTablet::reset_approximate_stats(int64_t
num_rowsets, int64_t num_segme
if (v.second < cp) {
continue;
}
-
cumu_num_deltas += r->is_segments_overlapping() ? r->num_segments() :
1;
++cumu_num_rowsets;
}
+ // num_rowsets may be less than the size of _rs_version_map when there are
some hole rowsets
+ // in the version map, so we use the max value to ensure that the
approximate number
+ // of rowsets is at least the size of _rs_version_map.
+ // Note that this is not the exact number of rowsets, but an approximate
number.
+ int64_t approximate_num_rowsets =
+ std::max(num_rowsets,
static_cast<int64_t>(_rs_version_map.size()));
+ _approximate_num_rowsets.store(approximate_num_rowsets,
std::memory_order_relaxed);
_approximate_cumu_num_rowsets.store(cumu_num_rowsets,
std::memory_order_relaxed);
_approximate_cumu_num_deltas.store(cumu_num_deltas,
std::memory_order_relaxed);
}
diff --git a/be/src/cloud/cloud_tablets_channel.cpp
b/be/src/cloud/cloud_tablets_channel.cpp
index c38680a81dc..1f99fd53a8b 100644
--- a/be/src/cloud/cloud_tablets_channel.cpp
+++ b/be/src/cloud/cloud_tablets_channel.cpp
@@ -20,6 +20,7 @@
#include "cloud/cloud_delta_writer.h"
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
+#include "cloud/config.h"
#include "olap/delta_writer.h"
#include "runtime/tablets_channel.h"
@@ -59,6 +60,7 @@ Status CloudTabletsChannel::add_batch(const
PTabletWriterAddBlockRequest& reques
_build_tablet_to_rowidxs(request, &tablet_to_rowidxs);
std::unordered_set<int64_t> partition_ids;
+ std::vector<CloudDeltaWriter*> writers;
{
// add_batch may concurrency with inc_open but not under _lock.
// so need to protect it with _tablet_writers_lock.
@@ -69,8 +71,11 @@ Status CloudTabletsChannel::add_batch(const
PTabletWriterAddBlockRequest& reques
return Status::InternalError("unknown tablet to append data,
tablet={}", tablet_id);
}
partition_ids.insert(tablet_writer_it->second->partition_id());
+
writers.push_back(static_cast<CloudDeltaWriter*>(tablet_writer_it->second.get()));
}
- if (!partition_ids.empty()) {
+ if (config::skip_writing_empty_rowset_metadata && !writers.empty()) {
+ RETURN_IF_ERROR(CloudDeltaWriter::batch_init(writers));
+ } else if (!partition_ids.empty()) {
RETURN_IF_ERROR(_init_writers_by_partition_ids(partition_ids));
}
}
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index 3db8a758b27..16d6aa7f782 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -90,6 +90,8 @@ DEFINE_mInt32(meta_service_conflict_error_retry_times, "10");
DEFINE_Bool(enable_check_storage_vault, "true");
+DEFINE_mBool(skip_writing_empty_rowset_metadata, "false");
+
DEFINE_mInt64(warmup_tablet_replica_info_cache_ttl_sec, "600");
DEFINE_mInt64(warm_up_rowset_slow_log_ms, "1000");
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index 18a6097c56b..a52ac758e67 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -107,6 +107,9 @@ DECLARE_mInt32(delete_bitmap_lock_expiration_seconds);
DECLARE_mInt32(get_delete_bitmap_lock_max_retry_times);
+// Skip writing empty rowset metadata to meta service
+DECLARE_mBool(skip_writing_empty_rowset_metadata);
+
// enable large txn lazy commit in meta-service `commit_txn`
DECLARE_mBool(enable_cloud_txn_lazy_commit);
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index aba7561d1a3..243cd65351a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -2018,6 +2018,8 @@ Status set_fuzzy_configs() {
((distribution(*generator) % 2) == 0) ? "10" : "4294967295";
fuzzy_field_and_value["max_segment_partial_column_cache_size"] =
((distribution(*generator) % 2) == 0) ? "5" : "10";
+ fuzzy_field_and_value["skip_writing_empty_rowset_metadata"] =
+ ((distribution(*generator) % 2) == 0) ? "true" : "false";
std::uniform_int_distribution<int64_t> distribution2(-2, 10);
fuzzy_field_and_value["segments_key_bounds_truncation_threshold"] =
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index a3ccc407078..34ddf37e5e3 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -1431,6 +1431,46 @@ Status CloudCompactionMixin::modify_rowsets() {
return Status::OK();
}
+Status
CloudCompactionMixin::set_storage_resource_from_input_rowsets(RowsetWriterContext&
ctx) {
+ // Set storage resource from input rowsets by iterating backwards to find
the first rowset
+ // with non-empty resource_id. This handles two scenarios:
+ // 1. Hole rowsets compaction: Multiple hole rowsets may lack storage
resource.
+ // Example: [0-1, 2-2, 3-3, 4-4, 5-5] where 2-5 are hole rowsets.
+ // If 0-1 lacks resource_id, then 2-5 also lack resource_id.
+ // 2. Schema change: New tablet may have later version empty rowsets
without resource_id,
+ // but middle rowsets get resource_id after historical rowsets are
converted.
+ // We iterate backwards to find the most recent rowset with valid
resource_id.
+
+ for (const auto& rowset : std::ranges::reverse_view(_input_rowsets)) {
+ const auto& resource_id = rowset->rowset_meta()->resource_id();
+
+ if (!resource_id.empty()) {
+ ctx.storage_resource =
*DORIS_TRY(rowset->rowset_meta()->remote_storage_resource());
+ return Status::OK();
+ }
+
+ // Validate that non-empty rowsets (num_segments > 0) must have valid
resource_id
+ // Only hole rowsets or empty rowsets are allowed to have empty
resource_id
+ if (rowset->num_segments() > 0) {
+ auto error_msg = fmt::format(
+ "Non-empty rowset must have valid resource_id. "
+ "rowset_id={}, version=[{}-{}], is_hole_rowset={},
num_segments={}, "
+ "tablet_id={}, table_id={}",
+ rowset->rowset_id().to_string(), rowset->start_version(),
rowset->end_version(),
+ rowset->is_hole_rowset(), rowset->num_segments(),
_tablet->tablet_id(),
+ _tablet->table_id());
+
+#ifndef BE_TEST
+ DCHECK(false) << error_msg;
+#endif
+
+ return Status::InternalError<false>(error_msg);
+ }
+ }
+
+ return Status::OK();
+}
+
Status
CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx) {
// only do index compaction for dup_keys and unique_keys with mow enabled
if (config::inverted_index_compaction_enable &&
@@ -1440,9 +1480,8 @@ Status
CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
construct_index_compaction_columns(ctx);
}
- // Use the storage resource of the previous rowset
- ctx.storage_resource =
-
*DORIS_TRY(_input_rowsets.back()->rowset_meta()->remote_storage_resource());
+ // Use the storage resource of the previous rowset.
+ RETURN_IF_ERROR(set_storage_resource_from_input_rowsets(ctx));
ctx.txn_id =
boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
std::numeric_limits<int64_t>::max(); // MUST be positive
@@ -1492,4 +1531,14 @@ Status CloudCompactionMixin::garbage_collection() {
return Status::OK();
}
+// should skip hole rowsets, ortherwise the count will be wrong in ms
+int64_t CloudCompactionMixin::num_input_rowsets() const {
+ int64_t count = 0;
+ for (const auto& r : _input_rowsets) {
+ if (!r->is_hole_rowset()) {
+ count++;
+ }
+ }
+ return count;
+}
} // namespace doris
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index de295a8e7ff..f2b9e86c4bb 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -189,6 +189,8 @@ public:
int64_t initiator() const;
+ int64_t num_input_rowsets() const;
+
protected:
CloudTablet* cloud_tablet() { return
static_cast<CloudTablet*>(_tablet.get()); }
@@ -203,6 +205,8 @@ protected:
private:
Status construct_output_rowset_writer(RowsetWriterContext& ctx) override;
+ Status set_storage_resource_from_input_rowsets(RowsetWriterContext& ctx);
+
Status execute_compact_impl(int64_t permits);
Status build_basic_info();
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 3d2a3d965f0..9e4ceb870cc 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -318,6 +318,11 @@ public:
std::vector<std::string> get_index_file_names();
+ // check if the rowset is a hole rowset
+ bool is_hole_rowset() const { return _is_hole_rowset; }
+ // set the rowset as a hole rowset
+ void set_hole_rowset(bool is_hole_rowset) { _is_hole_rowset =
is_hole_rowset; }
+
protected:
friend class RowsetFactory;
@@ -361,6 +366,11 @@ protected:
// <column_uniq_id>, skip index compaction
std::set<int32_t> skip_index_compaction;
+
+ // only used for cloud mode, it indicates whether this rowset is a hole
rowset.
+ // a hole rowset is a rowset that has no data, but is used to fill the
version gap
+ // it is used to ensure that the version sequence is continuous.
+ bool _is_hole_rowset = false;
};
// `rs_metas` MUST already be sorted by `RowsetMeta::comparator`
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 1ec5b30f0fe..8d79037b87c 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -68,6 +68,10 @@ public:
const std::string& resource_id() const { return
_rowset_meta_pb.resource_id(); }
+ void set_resource_id(const std::string& resource_id) {
+ _rowset_meta_pb.set_resource_id(resource_id);
+ }
+
bool is_local() const { return !_rowset_meta_pb.has_resource_id(); }
bool has_variant_type_in_schema() const;
diff --git a/be/test/cloud/cloud_compaction_test.cpp
b/be/test/cloud/cloud_compaction_test.cpp
index 37847025c9e..049da4b15ae 100644
--- a/be/test/cloud/cloud_compaction_test.cpp
+++ b/be/test/cloud/cloud_compaction_test.cpp
@@ -27,11 +27,11 @@
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
-#include "gtest/gtest_pred_impl.h"
#include "json2pb/json_to_pb.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/rowset_meta.h"
+#include "olap/storage_policy.h"
#include "olap/tablet_meta.h"
#include "util/uid_util.h"
@@ -197,4 +197,192 @@ TEST_F(CloudCompactionTest,
failure_cumu_compaction_tablet_sleep_test) {
ASSERT_EQ(st, Status::OK());
ASSERT_EQ(tablets.size(), 0);
}
+
+static RowsetSharedPtr create_rowset(Version version, int num_segments, bool
overlapping,
+ int data_size) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_rowset_type(BETA_ROWSET); // important
+ rs_meta->_rowset_meta_pb.set_start_version(version.first);
+ rs_meta->_rowset_meta_pb.set_end_version(version.second);
+ rs_meta->set_num_segments(num_segments);
+ rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING);
+ rs_meta->set_total_disk_size(data_size);
+ RowsetSharedPtr rowset;
+ Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset);
+ if (!st.ok()) {
+ return nullptr;
+ }
+ return rowset;
+}
+
+class TestableCloudCompaction : public CloudCompactionMixin {
+public:
+ TestableCloudCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet)
+ : CloudCompactionMixin(engine, tablet, "test_compaction") {}
+
+ // Set input rowsets for testing
+ void set_input_rowsets(const std::vector<RowsetSharedPtr>& rowsets) {
+ _input_rowsets = rowsets;
+ }
+
+ Status prepare_compact() override { return Status::OK(); }
+
+ ReaderType compaction_type() const override { return
ReaderType::READER_CUMULATIVE_COMPACTION; }
+
+ std::string_view compaction_name() const override { return
"test_compaction"; }
+};
+
+TEST_F(CloudCompactionTest, test_set_storage_resource_from_input_rowsets) {
+ S3Conf s3_conf {.bucket = "bucket",
+ .prefix = "prefix",
+ .client_conf = {
+ .endpoint = "endpoint",
+ .region = "region",
+ .ak = "ak",
+ .sk = "sk",
+ .token = "",
+ .bucket = "",
+ .role_arn = "",
+ .external_id = "",
+ }};
+ std::string resource_id = "10000";
+ auto res = io::S3FileSystem::create(std::move(s3_conf), resource_id);
+ ASSERT_TRUE(res.has_value()) << res.error();
+ auto fs = res.value();
+ StorageResource storage_resource(fs);
+
+ CloudTabletSPtr tablet = std::make_shared<CloudTablet>(_engine,
_tablet_meta);
+ TestableCloudCompaction compaction(_engine, tablet);
+
+ // Test case 1: All rowsets are empty (num_segments = 0) - should succeed
+ {
+ std::vector<RowsetSharedPtr> rowsets;
+
+ RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 0, false, 41);
+ ASSERT_TRUE(rowset1 != nullptr);
+ rowset1->set_hole_rowset(true); // Mark as hole rowset since
num_segments=0
+ rowsets.push_back(rowset1);
+
+ RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 0, false, 41);
+ ASSERT_TRUE(rowset2 != nullptr);
+ rowset2->set_hole_rowset(true); // Mark as hole rowset since
num_segments=0
+ rowsets.push_back(rowset2);
+
+ compaction.set_input_rowsets(rowsets);
+
+ RowsetWriterContext ctx;
+ Status st = compaction.set_storage_resource_from_input_rowsets(ctx);
+ ASSERT_TRUE(st.ok()) << st.to_string();
+ // No storage resource should be set since no rowset has resource_id
+ ASSERT_FALSE(ctx.storage_resource.has_value());
+ }
+
+ // Test case 2: Backward iteration - last rowset has resource_id
+ {
+ std::vector<RowsetSharedPtr> rowsets;
+
+ // First rowset: empty, no resource_id
+ RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 0, false, 41);
+ ASSERT_TRUE(rowset1 != nullptr);
+ rowset1->set_hole_rowset(true);
+ rowsets.push_back(rowset1);
+
+ // Second rowset: empty, no resource_id
+ RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 0, false, 41);
+ ASSERT_TRUE(rowset2 != nullptr);
+ rowset2->set_hole_rowset(true);
+ rowsets.push_back(rowset2);
+
+ // Third rowset: has resource_id (should be found during backward
iteration)
+ RowsetSharedPtr rowset3 = create_rowset(Version(4, 4), 1, false, 41);
+ ASSERT_TRUE(rowset3 != nullptr);
+ rowset3->rowset_meta()->set_remote_storage_resource(storage_resource);
+ rowsets.push_back(rowset3);
+
+ compaction.set_input_rowsets(rowsets);
+
+ RowsetWriterContext ctx;
+ Status st = compaction.set_storage_resource_from_input_rowsets(ctx);
+ ASSERT_TRUE(st.ok()) << st.to_string();
+ // Storage resource should be set from rowset3
+ ASSERT_TRUE(ctx.storage_resource.has_value());
+ }
+
+ // Test case 3: Multiple rowsets with resource_id - should use the last
one (backward iteration)
+ {
+ std::vector<RowsetSharedPtr> rowsets;
+
+ // First rowset: has resource_id
+ RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 1, false, 41);
+ ASSERT_TRUE(rowset1 != nullptr);
+ StorageResource first_resource(fs);
+ rowset1->rowset_meta()->set_remote_storage_resource(first_resource);
+ rowsets.push_back(rowset1);
+
+ // Second rowset: empty, no resource_id
+ RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 0, false, 41);
+ ASSERT_TRUE(rowset2 != nullptr);
+ rowset2->set_hole_rowset(true);
+ rowsets.push_back(rowset2);
+
+ // Third rowset: has different resource_id (should be used due to
backward iteration)
+ RowsetSharedPtr rowset3 = create_rowset(Version(4, 4), 1, false, 41);
+ ASSERT_TRUE(rowset3 != nullptr);
+ rowset3->rowset_meta()->set_remote_storage_resource(storage_resource);
+ rowsets.push_back(rowset3);
+
+ compaction.set_input_rowsets(rowsets);
+
+ RowsetWriterContext ctx;
+ Status st = compaction.set_storage_resource_from_input_rowsets(ctx);
+ ASSERT_TRUE(st.ok()) << st.to_string();
+ // Storage resource should be set from rowset3 (last one with
resource_id)
+ ASSERT_TRUE(ctx.storage_resource.has_value());
+ }
+
+ // Test case 4: Non-empty rowset in the middle without resource_id -
should fail
+ {
+ std::vector<RowsetSharedPtr> rowsets;
+
+ // First rowset: has resource_id
+ RowsetSharedPtr rowset1 = create_rowset(Version(2, 2), 1, false, 41);
+ ASSERT_TRUE(rowset1 != nullptr);
+ rowset1->rowset_meta()->set_remote_storage_resource(storage_resource);
+ rowsets.push_back(rowset1);
+
+ // Second rowset: non-empty but no resource_id (invalid)
+ RowsetSharedPtr rowset2 = create_rowset(Version(3, 3), 2, false, 41);
+ ASSERT_TRUE(rowset2 != nullptr);
+ // Intentionally don't set resource_id
+ rowsets.push_back(rowset2);
+
+ // Third rowset: empty, no resource_id
+ RowsetSharedPtr rowset3 = create_rowset(Version(4, 4), 0, false, 41);
+ ASSERT_TRUE(rowset3 != nullptr);
+ rowset3->set_hole_rowset(true); // Mark as hole rowset since
num_segments=0
+ rowsets.push_back(rowset3);
+
+ compaction.set_input_rowsets(rowsets);
+
+ RowsetWriterContext ctx;
+ Status st = compaction.set_storage_resource_from_input_rowsets(ctx);
+ ASSERT_TRUE(st.is<ErrorCode::INTERNAL_ERROR>());
+ ASSERT_TRUE(st.to_string().find("Non-empty rowset must have valid
resource_id") !=
+ std::string::npos)
+ << st.to_string();
+ }
+
+ // Test case 5: Empty input rowsets - should succeed
+ {
+ std::vector<RowsetSharedPtr> rowsets; // Empty vector
+
+ compaction.set_input_rowsets(rowsets);
+
+ RowsetWriterContext ctx;
+ Status st = compaction.set_storage_resource_from_input_rowsets(ctx);
+ ASSERT_TRUE(st.ok()) << st.to_string();
+ // No storage resource should be set
+ ASSERT_FALSE(ctx.storage_resource.has_value());
+ }
+}
} // namespace doris
diff --git a/be/test/cloud/cloud_meta_mgr_test.cpp
b/be/test/cloud/cloud_meta_mgr_test.cpp
index b938d949553..b925dd6f645 100644
--- a/be/test/cloud/cloud_meta_mgr_test.cpp
+++ b/be/test/cloud/cloud_meta_mgr_test.cpp
@@ -21,6 +21,15 @@
#include <chrono>
#include <memory>
+#include <set>
+
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/tablet_meta.h"
+#include "util/uid_util.h"
namespace doris {
using namespace cloud;
@@ -88,4 +97,531 @@ TEST_F(CloudMetaMgrTest, bthread_fork_join_test) {
// clang-format on
}
+TEST_F(CloudMetaMgrTest, test_fill_version_holes_no_holes) {
+ CloudStorageEngine engine(EngineOptions {});
+ CloudMetaMgr meta_mgr;
+
+ TabletMetaSharedPtr tablet_meta(
+ new TabletMeta(1001, 2, 15673, 15674, 4, 5, TTabletSchema(), 6,
{{7, 8}},
+ UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F));
+ auto tablet = std::make_shared<CloudTablet>(engine,
std::make_shared<TabletMeta>(*tablet_meta));
+
+ // Add consecutive versions: 0, 1, 2, 3, 4
+ std::vector<RowsetSharedPtr> rowsets;
+ for (int64_t version = 0; version <= 4; ++version) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_tablet_id(1001);
+ rs_meta->set_index_id(2);
+ rs_meta->set_partition_id(15673);
+ rs_meta->set_tablet_uid(UniqueId(9, 10));
+ rs_meta->set_version(Version(version, version));
+ rs_meta->set_rowset_type(BETA_ROWSET);
+ rs_meta->set_rowset_id(engine.next_rowset_id());
+ rs_meta->set_num_rows(100);
+ rs_meta->set_empty(false);
+ rs_meta->set_tablet_schema(tablet->tablet_schema());
+
+ // Create rowset and add it to tablet
+ RowsetSharedPtr rowset;
+ auto status = RowsetFactory::create_rowset(tablet->tablet_schema(),
tablet->tablet_path(),
+ rs_meta, &rowset);
+ EXPECT_TRUE(status.ok());
+ rowsets.push_back(rowset);
+ }
+
+ // Add all rowsets to tablet
+ {
+ std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
+ tablet->add_rowsets(std::move(rowsets), false, lock, false);
+ }
+
+ // Test fill_version_holes directly - should not add any rowsets since
there are no holes
+ std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
+ Status status = meta_mgr.fill_version_holes(tablet.get(), 4, wlock);
+ EXPECT_TRUE(status.ok());
+
+ // Verify tablet still has the same number of rowsets (no holes to fill)
+ EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 5);
+ // Verify rows number is correct
+ for (const auto& rs_meta : tablet->tablet_meta()->all_rs_metas()) {
+ EXPECT_EQ(rs_meta->num_rows(), 100);
+ }
+}
+
+TEST_F(CloudMetaMgrTest, test_fill_version_holes_with_holes) {
+ CloudStorageEngine engine(EngineOptions {});
+ CloudMetaMgr meta_mgr;
+
+ TabletMetaSharedPtr tablet_meta(
+ new TabletMeta(1001, 2, 15673, 15674, 4, 5, TTabletSchema(), 6,
{{7, 8}},
+ UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F));
+ auto tablet = std::make_shared<CloudTablet>(engine,
std::make_shared<TabletMeta>(*tablet_meta));
+
+ // Add non-consecutive versions: 0, 2, 4 (missing 1, 3)
+ std::vector<int64_t> versions = {0, 2, 4};
+ std::vector<RowsetSharedPtr> rowsets;
+ for (int64_t version : versions) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_tablet_id(1001);
+ rs_meta->set_index_id(2);
+ rs_meta->set_partition_id(15673);
+ rs_meta->set_tablet_uid(UniqueId(9, 10));
+ rs_meta->set_version(Version(version, version));
+ rs_meta->set_rowset_type(BETA_ROWSET);
+ rs_meta->set_rowset_id(engine.next_rowset_id());
+ rs_meta->set_num_rows(100);
+ rs_meta->set_empty(false);
+ rs_meta->set_tablet_schema(tablet->tablet_schema());
+
+ // Create rowset and add it to list
+ RowsetSharedPtr rowset;
+ auto status = RowsetFactory::create_rowset(tablet->tablet_schema(),
tablet->tablet_path(),
+ rs_meta, &rowset);
+ EXPECT_TRUE(status.ok());
+ rowsets.push_back(rowset);
+ }
+
+ // Add all rowsets to tablet
+ {
+ std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
+ tablet->add_rowsets(std::move(rowsets), false, lock, false);
+ }
+
+ // Initially we have 3 rowsets (versions 0, 2, 4)
+ EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 3);
+
+ // Test fill_version_holes directly to fill missing versions 1 and 3
+ std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
+ Status status = meta_mgr.fill_version_holes(tablet.get(), 4, wlock);
+ EXPECT_TRUE(status.ok());
+
+ // After filling holes, we should have 5 rowsets (versions 0, 1, 2, 3, 4)
+ EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 5);
+
+ // Verify all versions are present
+ auto rs_metas = tablet->tablet_meta()->all_rs_metas();
+ std::set<int64_t> found_versions;
+ for (const auto& rs_meta : rs_metas) {
+ found_versions.insert(rs_meta->version().first);
+ }
+ EXPECT_EQ(found_versions.size(), 5);
+ EXPECT_TRUE(found_versions.contains(0));
+ EXPECT_TRUE(found_versions.contains(1));
+ EXPECT_TRUE(found_versions.contains(2));
+ EXPECT_TRUE(found_versions.contains(3));
+ EXPECT_TRUE(found_versions.contains(4));
+
+ // Verify the hole rowsets (versions 1 and 3) are empty
+ for (const auto& rs_meta : rs_metas) {
+ if (rs_meta->version().first == 1 || rs_meta->version().first == 3) {
+ EXPECT_TRUE(rs_meta->empty());
+ EXPECT_EQ(rs_meta->num_rows(), 0);
+ } else {
+ EXPECT_FALSE(rs_meta->empty());
+ EXPECT_EQ(rs_meta->num_rows(), 100);
+ }
+ }
+}
+
+// Test create_empty_rowset_for_hole function
+TEST_F(CloudMetaMgrTest, test_create_empty_rowset_for_hole) {
+ CloudStorageEngine engine(EngineOptions {});
+ CloudMetaMgr meta_mgr;
+
+ TabletMetaSharedPtr tablet_meta(
+ new TabletMeta(1001, 2, 15673, 15674, 4, 5, TTabletSchema(), 6,
{{7, 8}},
+ UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F));
+ auto tablet = std::make_shared<CloudTablet>(engine,
std::make_shared<TabletMeta>(*tablet_meta));
+
+ // Create a previous rowset meta to pass as reference
+ auto prev_rs_meta = std::make_shared<RowsetMeta>();
+ prev_rs_meta->set_tablet_id(1001);
+ prev_rs_meta->set_index_id(2);
+ prev_rs_meta->set_partition_id(15673);
+ prev_rs_meta->set_tablet_uid(UniqueId(9, 10));
+ prev_rs_meta->set_version(Version(1, 1));
+ prev_rs_meta->set_rowset_type(BETA_ROWSET);
+ prev_rs_meta->set_rowset_id(engine.next_rowset_id());
+ prev_rs_meta->set_num_rows(100);
+ prev_rs_meta->set_empty(false);
+ prev_rs_meta->set_tablet_schema(tablet->tablet_schema());
+
+ // Test creating an empty rowset for version hole
+ RowsetSharedPtr hole_rowset;
+ Status status =
+ meta_mgr.create_empty_rowset_for_hole(tablet.get(), 2,
prev_rs_meta, &hole_rowset);
+ EXPECT_TRUE(status.ok()) << "Failed to create empty rowset for hole: " <<
status;
+ EXPECT_NE(hole_rowset, nullptr);
+
+ // Verify the hole rowset properties
+ auto hole_rs_meta = hole_rowset->rowset_meta();
+ EXPECT_EQ(hole_rs_meta->tablet_id(), 15673);
+ EXPECT_EQ(hole_rs_meta->index_id(), 0);
+ EXPECT_EQ(hole_rs_meta->partition_id(), 2);
+ EXPECT_EQ(hole_rs_meta->tablet_uid(), UniqueId(9, 10));
+ EXPECT_EQ(hole_rs_meta->version(), Version(2, 2));
+ EXPECT_EQ(hole_rs_meta->rowset_type(), BETA_ROWSET);
+ EXPECT_EQ(hole_rs_meta->num_rows(), 0);
+ EXPECT_EQ(hole_rs_meta->total_disk_size(), 0);
+ EXPECT_EQ(hole_rs_meta->data_disk_size(), 0);
+ EXPECT_EQ(hole_rs_meta->index_disk_size(), 0);
+ EXPECT_TRUE(hole_rs_meta->empty());
+ EXPECT_EQ(hole_rs_meta->num_segments(), 0);
+ EXPECT_EQ(hole_rs_meta->segments_overlap(), NONOVERLAPPING);
+ EXPECT_EQ(hole_rs_meta->rowset_state(), VISIBLE);
+ EXPECT_TRUE(hole_rowset->is_hole_rowset());
+ EXPECT_EQ(hole_rowset->txn_id(), 2); // txn_id should match version
+ RowsetId expected_rowset_id;
+ expected_rowset_id.init(2, 0, 15673, 2);
+ EXPECT_EQ(hole_rowset->rowset_meta()->rowset_id(), expected_rowset_id);
+
+ // Test creating multiple hole rowsets with different versions
+ RowsetSharedPtr hole_rowset_v3;
+ status = meta_mgr.create_empty_rowset_for_hole(tablet.get(), 3,
prev_rs_meta, &hole_rowset_v3);
+ EXPECT_TRUE(status.ok());
+ EXPECT_NE(hole_rowset_v3, nullptr);
+ EXPECT_EQ(hole_rowset_v3->rowset_meta()->version(), Version(3, 3));
+ EXPECT_TRUE(hole_rowset_v3->is_hole_rowset());
+
+ // Verify different hole rowsets have different rowset IDs
+ EXPECT_NE(hole_rowset->rowset_meta()->rowset_id(),
hole_rowset_v3->rowset_meta()->rowset_id());
+}
+
+TEST_F(CloudMetaMgrTest, test_fill_version_holes_edge_cases) {
+ CloudStorageEngine engine(EngineOptions {});
+ CloudMetaMgr meta_mgr;
+
+ // Test case 1: max_version <= 0
+ {
+ TabletMetaSharedPtr tablet_meta(new TabletMeta(
+ 1001, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
UniqueId(9, 10),
+ TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
+ auto tablet =
+ std::make_shared<CloudTablet>(engine,
std::make_shared<TabletMeta>(*tablet_meta));
+
+ std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
+ Status status = meta_mgr.fill_version_holes(tablet.get(), 0, wlock);
+ EXPECT_TRUE(status.ok());
+
+ status = meta_mgr.fill_version_holes(tablet.get(), -1, wlock);
+ EXPECT_TRUE(status.ok());
+
+ EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 0);
+ }
+
+ // Test case 2: empty tablet (no existing versions)
+ {
+ TabletMetaSharedPtr tablet_meta(new TabletMeta(
+ 1002, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}},
UniqueId(9, 10),
+ TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
+ auto tablet =
+ std::make_shared<CloudTablet>(engine,
std::make_shared<TabletMeta>(*tablet_meta));
+
+ std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
+ Status status = meta_mgr.fill_version_holes(tablet.get(), 5, wlock);
+ EXPECT_TRUE(status.ok());
+
+ // Should still have no rowsets
+ EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 0);
+ }
+}
+
+TEST_F(CloudMetaMgrTest, test_fill_version_holes_trailing_holes) {
+ CloudStorageEngine engine(EngineOptions {});
+ CloudMetaMgr meta_mgr;
+
+ TabletMetaSharedPtr tablet_meta(
+ new TabletMeta(1003, 2, 15673, 15674, 4, 5, TTabletSchema(), 6,
{{7, 8}},
+ UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F));
+ auto tablet = std::make_shared<CloudTablet>(engine,
std::make_shared<TabletMeta>(*tablet_meta));
+
+ // Add only versions 0, 1, 2 but max_version is 5 (missing 3, 4, 5)
+ std::vector<RowsetSharedPtr> rowsets;
+ for (int64_t version = 0; version <= 2; ++version) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_tablet_id(1003);
+ rs_meta->set_index_id(2);
+ rs_meta->set_partition_id(15673);
+ rs_meta->set_tablet_uid(UniqueId(9, 10));
+ rs_meta->set_version(Version(version, version));
+ rs_meta->set_rowset_type(BETA_ROWSET);
+ rs_meta->set_rowset_id(engine.next_rowset_id());
+ rs_meta->set_num_rows(100);
+ rs_meta->set_empty(false);
+ rs_meta->set_tablet_schema(tablet->tablet_schema());
+
+ RowsetSharedPtr rowset;
+ auto status = RowsetFactory::create_rowset(tablet->tablet_schema(),
tablet->tablet_path(),
+ rs_meta, &rowset);
+ EXPECT_TRUE(status.ok());
+ rowsets.push_back(rowset);
+ }
+
+ // Add all rowsets to tablet
+ {
+ std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
+ tablet->add_rowsets(std::move(rowsets), false, lock, false);
+ }
+
+ // Initially we have 3 rowsets (versions 0, 1, 2)
+ EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 3);
+
+ // Test fill_version_holes to fill trailing holes (versions 3, 4, 5)
+ std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
+ Status status = meta_mgr.fill_version_holes(tablet.get(), 5, wlock);
+ EXPECT_TRUE(status.ok());
+
+ // After filling holes, we should have 6 rowsets (versions 0, 1, 2, 3, 4,
5)
+ EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 6);
+
+ // Verify all versions are present
+ auto rs_metas = tablet->tablet_meta()->all_rs_metas();
+ std::set<int64_t> found_versions;
+ for (const auto& rs_meta : rs_metas) {
+ found_versions.insert(rs_meta->version().first);
+ }
+ EXPECT_EQ(found_versions.size(), 6);
+ for (int64_t v = 0; v <= 5; ++v) {
+ EXPECT_TRUE(found_versions.contains(v)) << "Missing version " << v;
+ }
+
+ // Verify the trailing hole rowsets (versions 3, 4, 5) are empty
+ for (const auto& rs_meta : rs_metas) {
+ if (rs_meta->version().first >= 3) {
+ EXPECT_TRUE(rs_meta->empty())
+ << "Version " << rs_meta->version().first << " should be
empty";
+ EXPECT_EQ(rs_meta->num_rows(), 0);
+ EXPECT_EQ(rs_meta->total_disk_size(), 0);
+ } else {
+ EXPECT_FALSE(rs_meta->empty())
+ << "Version " << rs_meta->version().first << " should not
be empty";
+ EXPECT_EQ(rs_meta->num_rows(), 100);
+ }
+ }
+}
+
+TEST_F(CloudMetaMgrTest, test_fill_version_holes_single_hole) {
+ CloudStorageEngine engine(EngineOptions {});
+ CloudMetaMgr meta_mgr;
+
+ TabletMetaSharedPtr tablet_meta(
+ new TabletMeta(1004, 2, 15673, 15674, 4, 5, TTabletSchema(), 6,
{{7, 8}},
+ UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F));
+ auto tablet = std::make_shared<CloudTablet>(engine,
std::make_shared<TabletMeta>(*tablet_meta));
+
+ // Add versions 0, 2 (missing only version 1)
+ std::vector<int64_t> versions = {0, 2};
+ std::vector<RowsetSharedPtr> rowsets;
+ for (int64_t version : versions) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_tablet_id(1004);
+ rs_meta->set_index_id(2);
+ rs_meta->set_partition_id(15673);
+ rs_meta->set_tablet_uid(UniqueId(9, 10));
+ rs_meta->set_version(Version(version, version));
+ rs_meta->set_rowset_type(BETA_ROWSET);
+ rs_meta->set_rowset_id(engine.next_rowset_id());
+ rs_meta->set_num_rows(100);
+ rs_meta->set_empty(false);
+ rs_meta->set_tablet_schema(tablet->tablet_schema());
+
+ RowsetSharedPtr rowset;
+ auto status = RowsetFactory::create_rowset(tablet->tablet_schema(),
tablet->tablet_path(),
+ rs_meta, &rowset);
+ EXPECT_TRUE(status.ok());
+ rowsets.push_back(rowset);
+ }
+
+ // Add all rowsets to tablet
+ {
+ std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
+ tablet->add_rowsets(std::move(rowsets), false, lock, false);
+ }
+
+ // Initially we have 2 rowsets (versions 0, 2)
+ EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 2);
+
+ // Test fill_version_holes to fill single hole (version 1)
+ std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
+ Status status = meta_mgr.fill_version_holes(tablet.get(), 2, wlock);
+ EXPECT_TRUE(status.ok());
+
+ // After filling holes, we should have 3 rowsets (versions 0, 1, 2)
+ EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 3);
+
+ // Verify all versions are present
+ auto rs_metas = tablet->tablet_meta()->all_rs_metas();
+ std::set<int64_t> found_versions;
+ for (const auto& rs_meta : rs_metas) {
+ found_versions.insert(rs_meta->version().first);
+ }
+ EXPECT_EQ(found_versions.size(), 3);
+ EXPECT_TRUE(found_versions.contains(0));
+ EXPECT_TRUE(found_versions.contains(1));
+ EXPECT_TRUE(found_versions.contains(2));
+
+ // Verify the hole rowset (version 1) is empty
+ for (const auto& rs_meta : rs_metas) {
+ if (rs_meta->version().first == 1) {
+ EXPECT_TRUE(rs_meta->empty());
+ EXPECT_EQ(rs_meta->num_rows(), 0);
+ EXPECT_EQ(rs_meta->total_disk_size(), 0);
+ } else {
+ EXPECT_FALSE(rs_meta->empty());
+ EXPECT_EQ(rs_meta->num_rows(), 100);
+ }
+ }
+}
+
+TEST_F(CloudMetaMgrTest, test_fill_version_holes_multiple_consecutive_holes) {
+ CloudStorageEngine engine(EngineOptions {});
+ CloudMetaMgr meta_mgr;
+
+ TabletMetaSharedPtr tablet_meta(
+ new TabletMeta(1005, 2, 15673, 15674, 4, 5, TTabletSchema(), 6,
{{7, 8}},
+ UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F));
+ auto tablet = std::make_shared<CloudTablet>(engine,
std::make_shared<TabletMeta>(*tablet_meta));
+
+ // Add versions 0, 5 (missing 1, 2, 3, 4 - multiple consecutive holes)
+ std::vector<int64_t> versions = {0, 5};
+ std::vector<RowsetSharedPtr> rowsets;
+ for (int64_t version : versions) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_tablet_id(1005);
+ rs_meta->set_index_id(2);
+ rs_meta->set_partition_id(15673);
+ rs_meta->set_tablet_uid(UniqueId(9, 10));
+ rs_meta->set_version(Version(version, version));
+ rs_meta->set_rowset_type(BETA_ROWSET);
+ rs_meta->set_rowset_id(engine.next_rowset_id());
+ rs_meta->set_num_rows(100);
+ rs_meta->set_empty(false);
+ rs_meta->set_tablet_schema(tablet->tablet_schema());
+
+ RowsetSharedPtr rowset;
+ auto status = RowsetFactory::create_rowset(tablet->tablet_schema(),
tablet->tablet_path(),
+ rs_meta, &rowset);
+ EXPECT_TRUE(status.ok());
+ rowsets.push_back(rowset);
+ }
+
+ // Add all rowsets to tablet
+ {
+ std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
+ tablet->add_rowsets(std::move(rowsets), false, lock, false);
+ }
+
+ // Initially we have 2 rowsets (versions 0, 5)
+ EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 2);
+
+ // Test fill_version_holes to fill multiple consecutive holes (versions 1,
2, 3, 4)
+ std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
+ Status status = meta_mgr.fill_version_holes(tablet.get(), 5, wlock);
+ EXPECT_TRUE(status.ok());
+
+ // After filling holes, we should have 6 rowsets (versions 0, 1, 2, 3, 4,
5)
+ EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 6);
+
+ // Verify all versions are present
+ auto rs_metas = tablet->tablet_meta()->all_rs_metas();
+ std::set<int64_t> found_versions;
+ for (const auto& rs_meta : rs_metas) {
+ found_versions.insert(rs_meta->version().first);
+ }
+ EXPECT_EQ(found_versions.size(), 6);
+ for (int64_t v = 0; v <= 5; ++v) {
+ EXPECT_TRUE(found_versions.contains(v)) << "Missing version " << v;
+ }
+
+ // Verify the hole rowsets (versions 1, 2, 3, 4) are empty
+ for (const auto& rs_meta : rs_metas) {
+ if (rs_meta->version().first >= 1 && rs_meta->version().first <= 4) {
+ EXPECT_TRUE(rs_meta->empty())
+ << "Version " << rs_meta->version().first << " should be
empty";
+ EXPECT_EQ(rs_meta->num_rows(), 0);
+ EXPECT_EQ(rs_meta->total_disk_size(), 0);
+ } else {
+ EXPECT_FALSE(rs_meta->empty())
+ << "Version " << rs_meta->version().first << " should not
be empty";
+ EXPECT_EQ(rs_meta->num_rows(), 100);
+ }
+ }
+}
+
+TEST_F(CloudMetaMgrTest, test_fill_version_holes_mixed_holes) {
+ CloudStorageEngine engine(EngineOptions {});
+ CloudMetaMgr meta_mgr;
+
+ TabletMetaSharedPtr tablet_meta(
+ new TabletMeta(1006, 2, 15673, 15674, 4, 5, TTabletSchema(), 6,
{{7, 8}},
+ UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F));
+ auto tablet = std::make_shared<CloudTablet>(engine,
std::make_shared<TabletMeta>(*tablet_meta));
+
+ // Add versions 0, 2, 5, 6 (missing 1, 3, 4 and potential trailing holes
up to max_version)
+ std::vector<int64_t> versions = {0, 2, 5, 6};
+ std::vector<RowsetSharedPtr> rowsets;
+ for (int64_t version : versions) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_tablet_id(1006);
+ rs_meta->set_index_id(2);
+ rs_meta->set_partition_id(15673);
+ rs_meta->set_tablet_uid(UniqueId(9, 10));
+ rs_meta->set_version(Version(version, version));
+ rs_meta->set_rowset_type(BETA_ROWSET);
+ rs_meta->set_rowset_id(engine.next_rowset_id());
+ rs_meta->set_num_rows(100);
+ rs_meta->set_empty(false);
+ rs_meta->set_tablet_schema(tablet->tablet_schema());
+
+ RowsetSharedPtr rowset;
+ auto status = RowsetFactory::create_rowset(tablet->tablet_schema(),
tablet->tablet_path(),
+ rs_meta, &rowset);
+ EXPECT_TRUE(status.ok());
+ rowsets.push_back(rowset);
+ }
+
+ // Add all rowsets to tablet
+ {
+ std::unique_lock<std::shared_mutex> lock(tablet->get_header_lock());
+ tablet->add_rowsets(std::move(rowsets), false, lock, false);
+ }
+
+ // Initially we have 4 rowsets (versions 0, 2, 5, 6)
+ EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 4);
+
+ // Test fill_version_holes with max_version = 8 (should fill 1, 3, 4, 7, 8)
+ std::unique_lock<std::shared_mutex> wlock(tablet->get_header_lock());
+ Status status = meta_mgr.fill_version_holes(tablet.get(), 8, wlock);
+ EXPECT_TRUE(status.ok());
+
+ // After filling holes, we should have 9 rowsets (versions 0-8)
+ EXPECT_EQ(tablet->tablet_meta()->all_rs_metas().size(), 9);
+
+ // Verify all versions are present
+ auto rs_metas = tablet->tablet_meta()->all_rs_metas();
+ std::set<int64_t> found_versions;
+ for (const auto& rs_meta : rs_metas) {
+ found_versions.insert(rs_meta->version().first);
+ }
+ EXPECT_EQ(found_versions.size(), 9);
+ for (int64_t v = 0; v <= 8; ++v) {
+ EXPECT_TRUE(found_versions.contains(v)) << "Missing version " << v;
+ }
+
+ // Verify the hole rowsets (versions 1, 3, 4, 7, 8) are empty
+ std::set<int64_t> original_versions = {0, 2, 5, 6};
+ std::set<int64_t> hole_versions = {1, 3, 4, 7, 8};
+ for (const auto& rs_meta : rs_metas) {
+ int64_t version = rs_meta->version().first;
+ if (hole_versions.contains(version)) {
+ EXPECT_TRUE(rs_meta->empty()) << "Version " << version << " should
be empty";
+ EXPECT_EQ(rs_meta->num_rows(), 0);
+ EXPECT_EQ(rs_meta->total_disk_size(), 0);
+ } else if (original_versions.contains(version)) {
+ EXPECT_FALSE(rs_meta->empty()) << "Version " << version << "
should not be empty";
+ EXPECT_EQ(rs_meta->num_rows(), 100);
+ }
+ }
+}
+
} // namespace doris
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index d5efa55c606..8e4d8d93b14 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -2461,7 +2461,7 @@ void
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
LOG(WARNING) << msg;
return;
}
-
+ response->set_partition_max_version(version_pb.version());
if (version_pb.pending_txn_ids_size() > 0) {
DCHECK(version_pb.pending_txn_ids_size() == 1);
stats.get_bytes += txn->get_bytes();
diff --git a/cloud/src/meta-service/meta_service_job.cpp
b/cloud/src/meta-service/meta_service_job.cpp
index 588a5cca1d6..501dc4f43b3 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -1000,7 +1000,9 @@ void process_compaction_job(MetaServiceCode& code,
std::string& msg, std::string
TEST_SYNC_POINT_CALLBACK("process_compaction_job::loop_input_done",
&num_rowsets);
- if (num_rowsets < 1) {
+ // compaction.num_input_rowsets is 0 when multiple hole rowsets are
compacted,
+ // we can continue to process the job for this case.
+ if (num_rowsets < 1 && compaction.num_input_rowsets() > 0) {
SS << "too few input rowsets, tablet_id=" << tablet_id << "
num_rowsets=" << num_rowsets;
code = MetaServiceCode::UNDEFINED_ERR;
msg = ss.str();
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 8faa59e7ce7..17bf92c08aa 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -2168,27 +2168,13 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id,
RecyclerMetricsContext&
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::recycle_tablet.create_rowset_meta",
&resp);
for (const auto& rs_meta : resp.rowset_meta()) {
- /*
- * For compatibility, we skip the loop for [0-1] here.
- * The purpose of this loop is to delete object files,
- * and since [0-1] only has meta and doesn't have object files,
- * skipping it doesn't affect system correctness.
- *
- * If not skipped, the check "if (!rs_meta.has_resource_id())" below
- * would return error -1 directly, causing the recycle operation to
fail.
- *
- * [0-1] doesn't have resource id is a bug.
- * In the future, we will fix this problem, after that,
- * we can remove this if statement.
- *
- * TODO(Yukang-Lian): remove this if statement when [0-1] has resource
id in the future.
- */
-
- if (rs_meta.end_version() == 1) {
- // Assert that [0-1] has no resource_id to make sure
- // this if statement will not be forgetted to remove
- // when the resource id bug is fixed
- DCHECK(!rs_meta.has_resource_id()) << "rs_meta" <<
rs_meta.ShortDebugString();
+ // The rowset has no resource id and segments when it was generated by
compaction
+ // with multiple hole rowsets or it's version is [0-1], so we can skip
it.
+ if (!rs_meta.has_resource_id() && rs_meta.num_segments() == 0) {
+ LOG_INFO("rowset meta does not have a resource id and no segments,
skip this rowset")
+ .tag("rs_meta", rs_meta.ShortDebugString())
+ .tag("instance_id", instance_id_)
+ .tag("tablet_id", tablet_id);
recycle_rowsets_number += 1;
continue;
}
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 987a6a15572..03f089e080a 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -4940,6 +4940,7 @@ TEST(RecyclerTest, recycle_tablet_without_resource_id) {
sp->set_call_back("InstanceRecycler::recycle_tablet.create_rowset_meta",
[](auto&& args) {
auto* resp = try_any_cast<GetRowsetResponse*>(args[0]);
auto* rs = resp->add_rowset_meta();
+ rs->set_num_segments(1); // force delete rowset data
EXPECT_EQ(rs->has_resource_id(), false);
});
sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) {
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 3e18b2dde73..51fbf1e6cbc 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1077,7 +1077,9 @@ message GetRowsetResponse {
repeated doris.RowsetMetaCloudPB rowset_meta = 2;
optional TabletStatsPB stats = 3;
// Return dict value if SchemaOp is RETURN_DICT
- optional SchemaCloudDictionary schema_dict = 4;
+ optional SchemaCloudDictionary schema_dict = 4;
+ // The current max version of the partition
+ optional int64 partition_max_version = 5;
}
message GetSchemaDictRequest {
diff --git
a/regression-test/data/compaction/test_compaction_with_empty_rowset.out
b/regression-test/data/compaction/test_compaction_with_empty_rowset.out
new file mode 100644
index 00000000000..8f98a92a26c
Binary files /dev/null and
b/regression-test/data/compaction/test_compaction_with_empty_rowset.out differ
diff --git
a/regression-test/data/schema_change_p0/test_schema_change_mow_with_empty_rowset.out
b/regression-test/data/schema_change_p0/test_schema_change_mow_with_empty_rowset.out
new file mode 100644
index 00000000000..c3605547fe0
Binary files /dev/null and
b/regression-test/data/schema_change_p0/test_schema_change_mow_with_empty_rowset.out
differ
diff --git a/regression-test/pipeline/cloud_p0/conf/be_custom.conf
b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
index 946f05cbb19..23d34651ce5 100644
--- a/regression-test/pipeline/cloud_p0/conf/be_custom.conf
+++ b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
@@ -46,5 +46,5 @@
sys_log_verbose_modules=query_context,runtime_query_statistics_mgr
# So feature has bug, so by default is false, only open it in pipeline to
observe
enable_parquet_page_index=true
-
+enable_fuzzy_mode=true
enable_prefill_all_dbm_agg_cache_after_compaction=true
diff --git
a/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy
b/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy
new file mode 100644
index 00000000000..6f6f869917d
--- /dev/null
+++ b/regression-test/suites/compaction/test_compaction_with_empty_rowset.groovy
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.awaitility.Awaitility
+
+suite("test_compaction_mow_with_empty_rowset", "p0") {
+ def tableName = "test_compaction_with_empty_rowset"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(11) NULL,
+ `k2` tinyint(4) NULL,
+ `k3` smallint(6) NULL,
+ `k4` int(30) NULL,
+ `k5` largeint(40) NULL,
+ `k6` float NULL,
+ `k7` double NULL,
+ `k8` decimal(9, 0) NULL,
+ `k9` char(10) NULL,
+ `k10` varchar(1024) NULL,
+ `k11` text NULL,
+ `k12` date NULL,
+ `k13` datetime NULL
+ ) ENGINE=OLAP
+ unique KEY(k1, k2, k3)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "enable_unique_key_merge_on_write" = "true"
+ );
+ """
+
+ for (int i = 0; i < 10; i++) {
+ sql """ insert into ${tableName} values (1, 2, 3, 4, 5, 6.6, 1.7, 8.8,
+ 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """
+ }
+
+ qt_sql """ select * from ${tableName} order by k1, k2, k3 """
+
+
+ def tablets = sql_return_maparray """ show tablets from ${tableName}; """
+
+ def replicaNum = get_table_replica_num(tableName)
+ logger.info("get table replica num: " + replicaNum)
+
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+ int rowCount = 0
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ def (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
+ }
+ assert (rowCount < 10 * replicaNum)
+ qt_sql2 """ select * from ${tableName} order by k1, k2, k3 """
+
+ for (int i = 0; i < 10; i++) {
+ sql """ insert into ${tableName} values (2, 2, 3, 4, 5, 6.6, 1.7, 8.8,
+ 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """
+ }
+
+ // trigger compactions for all tablets in ${tableName}
+ trigger_and_wait_compaction(tableName, "cumulative")
+ rowCount = 0
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ def (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ rowCount += Integer.parseInt(rowset.split(" ")[1])
+ }
+ }
+ assert (rowCount < 20 * replicaNum)
+ qt_sql3 """ select * from ${tableName} order by k1, k2, k3 """
+}
diff --git
a/regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy
b/regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy
new file mode 100644
index 00000000000..a0fe1d58321
--- /dev/null
+++
b/regression-test/suites/schema_change_p0/test_schema_change_mow_with_empty_rowset.groovy
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_schema_change_mow_with_empty_rowset", "p0") {
+ def tableName = "test_sc_mow_with_empty_rowset"
+
+ def getJobState = { tbl ->
+ def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE
IndexName='${tbl}' ORDER BY createtime DESC LIMIT 1 """
+ return jobStateResult[0][9]
+ }
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(11) NULL,
+ `k2` tinyint(4) NULL,
+ `k3` smallint(6) NULL,
+ `k4` int(30) NULL,
+ `k5` largeint(40) NULL,
+ `k6` float NULL,
+ `k7` double NULL,
+ `k8` decimal(9, 0) NULL,
+ `k9` char(10) NULL,
+ `k10` varchar(1024) NULL,
+ `k11` text NULL,
+ `k12` date NULL,
+ `k13` datetime NULL
+ ) ENGINE=OLAP
+ unique KEY(k1, k2, k3)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 2
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "enable_unique_key_merge_on_write" = "true"
+ );
+ """
+
+ for (int i = 0; i < 100; i++) {
+ sql """ insert into ${tableName} values ($i, 2, 3, 4, 5, 6.6, 1.7, 8.8,
+ 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """
+ }
+
+ sql """ alter table ${tableName} modify column k4 string NULL"""
+
+ for (int i = 0; i < 20; i++) {
+ sql """ insert into ${tableName} values (100, 2, 3, 4, 5, 6.6, 1.7,
8.8,
+ 'a', 'b', 'c', '2021-10-30', '2021-10-30 00:00:00') """
+ }
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).pollDelay(10,
TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(
+ {
+ String res = getJobState(tableName)
+ if (res == "FINISHED" || res == "CANCELLED") {
+ assertEquals("FINISHED", res)
+ return true
+ }
+ return false
+ }
+ )
+
+ qt_sql """ select * from ${tableName} order by k1, k2, k3 """
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]