This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 210abd6b2 fix(ts): Ensure downstream deletion of all subsequent data 
from latest bucket (#3183)
210abd6b2 is described below

commit 210abd6b2ce1d8c07d110873376698e4f6d034e4
Author: RX Xiao <[email protected]>
AuthorDate: Fri Sep 19 03:14:40 2025 +0800

    fix(ts): Ensure downstream deletion of all subsequent data from latest 
bucket (#3183)
    
    When a range deletion causes the latest downstream bucket to change, it
    must be ensured that all data subsequent to the new latest bucket is
    also deleted.
    
    ---------
    
    Co-authored-by: Twice <[email protected]>
    Co-authored-by: Aleks Lozovyuk <[email protected]>
---
 src/types/redis_timeseries.cc          | 15 ++++++----
 tests/cppunit/types/timeseries_test.cc | 53 ++++++++++++++++++++++++++++++++++
 2 files changed, 62 insertions(+), 6 deletions(-)

diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 4f0e495d3..7c5050ec7 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -1544,6 +1544,15 @@ rocksdb::Status 
TimeSeries::delRangeDownStream(engine::Context &ctx, const Slice
     s = upsertCommonInBatch(ctx, ds_ns_key, meta, sample_batch, batch);
     if (!s.ok()) return s;
 
+    // Update latest bucket index
+    if (!has_chunk) {
+      ds_meta.latest_bucket_idx = 0;
+      del_start = 0;  // All buckets can be deleted in downstream
+    } else if (to > last_chunk_end) {
+      ds_meta.latest_bucket_idx = 
agg.CalculateAlignedBucketLeft(last_chunk_end);
+      del_start = ds_meta.latest_bucket_idx;  // Latest bucket is changed, all 
subsequent buckets can be deleted
+    }
+
     // Delete affected buckets in downstream
     uint64_t deleted = 0;
     s = delRangeCommonInBatch(ctx, ds_ns_key, meta, del_start, end_bucket, 
batch, &deleted, inclusive_end);
@@ -1552,12 +1561,6 @@ rocksdb::Status 
TimeSeries::delRangeDownStream(engine::Context &ctx, const Slice
     // Update latest bucket if deletion affects the end
     if (end_bucket < ds_meta.latest_bucket_idx) continue;
 
-    if (!has_chunk) {
-      ds_meta.latest_bucket_idx = 0;
-    } else if (to > last_chunk_end) {
-      ds_meta.latest_bucket_idx = 
agg.CalculateAlignedBucketLeft(last_chunk_end);
-    }
-
     // Reaggregate latest bucket if needed
     ds_meta.ResetAuxs();
     if (has_chunk && last_chunk_start > 0) {
diff --git a/tests/cppunit/types/timeseries_test.cc 
b/tests/cppunit/types/timeseries_test.cc
index 403321ea6..837e8d0ea 100644
--- a/tests/cppunit/types/timeseries_test.cc
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -1031,6 +1031,59 @@ TEST_F(TimeSeriesTest, MRangeGroupSamplesAndReduce) {
   }
 }
 
+TEST_F(TimeSeriesTest, DelAllSamplesInLatestBucket) {
+  redis::TSCreateOption option;
+  option.chunk_size = 3;
+  const std::string key_src = "del_test_empty";
+  const std::string key_dst = "del_test_empty_dst";
+
+  auto s = ts_db_->Create(*ctx_, key_src, option);
+  EXPECT_TRUE(s.ok());
+  s = ts_db_->Create(*ctx_, key_dst, option);
+  EXPECT_TRUE(s.ok());
+
+  redis::TSAggregator aggregator;
+  aggregator.type = redis::TSAggregatorType::AVG;
+  aggregator.bucket_duration = 10;
+  aggregator.alignment = 0;
+
+  redis::TSCreateRuleResult result = redis::TSCreateRuleResult::kOK;
+  s = ts_db_->CreateRule(*ctx_, key_src, key_dst, aggregator, &result);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(result, redis::TSCreateRuleResult::kOK);
+
+  // Add sample data
+  std::vector<TSSample> samples = {{0, 10}, {11, 10}};
+  std::vector<TSChunk::AddResult> add_results(samples.size());
+  s = ts_db_->MAdd(*ctx_, key_src, samples, &add_results);
+  EXPECT_TRUE(s.ok());
+
+  // Del all samples in the latest bucket (10,20]
+  uint64_t deleted = 0;
+  s = ts_db_->Del(*ctx_, key_src, 11, 20, &deleted);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(deleted, 1);
+
+  // Query the destination time series
+  redis::TSRangeOption range_opt;
+  std::vector<TSSample> res;
+  s = ts_db_->Range(*ctx_, key_dst, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res.size(), 0);
+
+  // Add sample data
+  samples = {{5, 5}, {101, 101}};
+  s = ts_db_->MAdd(*ctx_, key_src, samples, &add_results);
+  EXPECT_TRUE(s.ok());
+
+  // Query the destination time series
+  s = ts_db_->Range(*ctx_, key_dst, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res.size(), 1);
+  EXPECT_EQ(res[0].ts, 0);
+  EXPECT_EQ(res[0].v, 7.5);
+}
+
 TEST_F(TimeSeriesTest, DelComprehensive) {
   using TSCreateOption = redis::TSCreateOption;
   using TSRangeOption = redis::TSRangeOption;

Reply via email to