This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 210abd6b2 fix(ts): Ensure downstream deletion of all subsequent data
from latest bucket (#3183)
210abd6b2 is described below
commit 210abd6b2ce1d8c07d110873376698e4f6d034e4
Author: RX Xiao <[email protected]>
AuthorDate: Fri Sep 19 03:14:40 2025 +0800
fix(ts): Ensure downstream deletion of all subsequent data from latest
bucket (#3183)
When a range deletion causes the latest downstream bucket to change, it
must be ensured that all data subsequent to the new latest bucket is
also deleted.
---------
Co-authored-by: Twice <[email protected]>
Co-authored-by: Aleks Lozovyuk <[email protected]>
---
src/types/redis_timeseries.cc | 15 ++++++----
tests/cppunit/types/timeseries_test.cc | 53 ++++++++++++++++++++++++++++++++++
2 files changed, 62 insertions(+), 6 deletions(-)
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 4f0e495d3..7c5050ec7 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -1544,6 +1544,15 @@ rocksdb::Status
TimeSeries::delRangeDownStream(engine::Context &ctx, const Slice
s = upsertCommonInBatch(ctx, ds_ns_key, meta, sample_batch, batch);
if (!s.ok()) return s;
+ // Update latest bucket index
+ if (!has_chunk) {
+ ds_meta.latest_bucket_idx = 0;
+ del_start = 0; // All buckets can be deleted in downstream
+ } else if (to > last_chunk_end) {
+ ds_meta.latest_bucket_idx =
agg.CalculateAlignedBucketLeft(last_chunk_end);
+ del_start = ds_meta.latest_bucket_idx; // Latest bucket is changed, all
subsequent buckets can be deleted
+ }
+
// Delete affected buckets in downstream
uint64_t deleted = 0;
s = delRangeCommonInBatch(ctx, ds_ns_key, meta, del_start, end_bucket,
batch, &deleted, inclusive_end);
@@ -1552,12 +1561,6 @@ rocksdb::Status
TimeSeries::delRangeDownStream(engine::Context &ctx, const Slice
// Update latest bucket if deletion affects the end
if (end_bucket < ds_meta.latest_bucket_idx) continue;
- if (!has_chunk) {
- ds_meta.latest_bucket_idx = 0;
- } else if (to > last_chunk_end) {
- ds_meta.latest_bucket_idx =
agg.CalculateAlignedBucketLeft(last_chunk_end);
- }
-
// Reaggregate latest bucket if needed
ds_meta.ResetAuxs();
if (has_chunk && last_chunk_start > 0) {
diff --git a/tests/cppunit/types/timeseries_test.cc
b/tests/cppunit/types/timeseries_test.cc
index 403321ea6..837e8d0ea 100644
--- a/tests/cppunit/types/timeseries_test.cc
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -1031,6 +1031,59 @@ TEST_F(TimeSeriesTest, MRangeGroupSamplesAndReduce) {
}
}
+TEST_F(TimeSeriesTest, DelAllSamplesInLatestBucket) {
+ redis::TSCreateOption option;
+ option.chunk_size = 3;
+ const std::string key_src = "del_test_empty";
+ const std::string key_dst = "del_test_empty_dst";
+
+ auto s = ts_db_->Create(*ctx_, key_src, option);
+ EXPECT_TRUE(s.ok());
+ s = ts_db_->Create(*ctx_, key_dst, option);
+ EXPECT_TRUE(s.ok());
+
+ redis::TSAggregator aggregator;
+ aggregator.type = redis::TSAggregatorType::AVG;
+ aggregator.bucket_duration = 10;
+ aggregator.alignment = 0;
+
+ redis::TSCreateRuleResult result = redis::TSCreateRuleResult::kOK;
+ s = ts_db_->CreateRule(*ctx_, key_src, key_dst, aggregator, &result);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(result, redis::TSCreateRuleResult::kOK);
+
+ // Add sample data
+ std::vector<TSSample> samples = {{0, 10}, {11, 10}};
+ std::vector<TSChunk::AddResult> add_results(samples.size());
+ s = ts_db_->MAdd(*ctx_, key_src, samples, &add_results);
+ EXPECT_TRUE(s.ok());
+
+ // Del all samples in the latest bucket (10,20]
+ uint64_t deleted = 0;
+ s = ts_db_->Del(*ctx_, key_src, 11, 20, &deleted);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(deleted, 1);
+
+ // Query the destination time series
+ redis::TSRangeOption range_opt;
+ std::vector<TSSample> res;
+ s = ts_db_->Range(*ctx_, key_dst, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res.size(), 0);
+
+ // Add sample data
+ samples = {{5, 5}, {101, 101}};
+ s = ts_db_->MAdd(*ctx_, key_src, samples, &add_results);
+ EXPECT_TRUE(s.ok());
+
+ // Query the destination time series
+ s = ts_db_->Range(*ctx_, key_dst, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res.size(), 1);
+ EXPECT_EQ(res[0].ts, 0);
+ EXPECT_EQ(res[0].v, 7.5);
+}
+
TEST_F(TimeSeriesTest, DelComprehensive) {
using TSCreateOption = redis::TSCreateOption;
using TSRangeOption = redis::TSRangeOption;