This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 32d349935 fix(ts): Avoid aggregating empty downstream bucket when 
writing to empty series (#3184)
32d349935 is described below

commit 32d349935cfa699f2b2522983131cbe9e725da71
Author: RX Xiao <[email protected]>
AuthorDate: Fri Sep 19 01:11:32 2025 +0800

    fix(ts): Avoid aggregating empty downstream bucket when writing to empty 
series (#3184)
    
    An edge case existed (due to the handling of [`latest_bucket_idx` in the
    downstream
    
meta](https://github.com/apache/kvrocks/blob/4025a6d23698b6bd5383511f259d10988fe7dc0e/src/types/redis_timeseries.h#L91-L98))
    where inserting data into an empty series with the first `bucket index >
    0` would incorrectly aggregate an empty or invalid bucket to the
    downstream series. The insertion logic has been corrected to fix this
    bug.
    
    - Add a `DownstreamUpsertArgs` struct to contain a flag that indicates
    this edge case.
    - Correctly update `latest_bucket_idx` in the downstream meta.
    
    ---------
    
    Co-authored-by: Twice <[email protected]>
---
 src/types/redis_timeseries.cc          | 82 +++++++++++++++++++++-------------
 src/types/redis_timeseries.h           | 16 +++++--
 tests/cppunit/types/timeseries_test.cc | 48 ++++++++++++++++++++
 3 files changed, 112 insertions(+), 34 deletions(-)

diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 3ae4873ac..4f0e495d3 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -245,10 +245,9 @@ std::vector<TSSample> GroupSamplesAndReduce(const 
std::vector<std::vector<TSSamp
   return result;
 }
 
-std::vector<TSSample> 
TSDownStreamMeta::AggregateMultiBuckets(nonstd::span<const TSSample> samples,
-                                                              bool 
skip_last_bucket) {
+std::vector<TSSample> TSDownStreamMeta::AggregateMultiBuckets(
+    const std::vector<nonstd::span<const TSSample>> &bucket_spans, bool 
skip_last_bucket) {
   std::vector<TSSample> res;
-  auto bucket_spans = aggregator.SplitSamplesToBuckets(samples);
   for (size_t i = 0; i < bucket_spans.size(); i++) {
     const auto &span = bucket_spans[i];
     if (span.empty()) {
@@ -853,9 +852,9 @@ rocksdb::Status 
TimeSeries::getOrCreateTimeSeries(engine::Context &ctx, const Sl
 }
 
 rocksdb::Status TimeSeries::upsertCommon(engine::Context &ctx, const Slice 
&ns_key, TimeSeriesMetadata &metadata,
-                                         SampleBatch &sample_batch, 
std::vector<std::string> *new_chunks) {
+                                         SampleBatch &sample_batch, 
DownstreamUpsertArgs *ds_args) {
   auto batch = storage_->GetWriteBatchBase();
-  auto s = upsertCommonInBatch(ctx, ns_key, metadata, sample_batch, batch, 
new_chunks);
+  auto s = upsertCommonInBatch(ctx, ns_key, metadata, sample_batch, batch, 
ds_args);
   if (!s.ok()) return s;
   return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
@@ -863,11 +862,11 @@ rocksdb::Status TimeSeries::upsertCommon(engine::Context 
&ctx, const Slice &ns_k
 rocksdb::Status TimeSeries::upsertCommonInBatch(engine::Context &ctx, const 
Slice &ns_key, TimeSeriesMetadata &metadata,
                                                 SampleBatch &sample_batch,
                                                 
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
-                                                std::vector<std::string> 
*new_chunks) {
-  auto all_batch_slice = sample_batch.AsSlice();
+                                                DownstreamUpsertArgs *ds_args) 
{
+  if (ds_args != nullptr) ds_args->new_chunks.clear();
 
-  if (all_batch_slice.GetSampleSpan().empty() && new_chunks != nullptr) {
-    new_chunks->clear();
+  auto all_batch_slice = sample_batch.AsSlice();
+  if (all_batch_slice.GetSampleSpan().empty()) {
     return rocksdb::Status::OK();
   }
 
@@ -983,12 +982,15 @@ rocksdb::Status 
TimeSeries::upsertCommonInBatch(engine::Context &ctx, const Slic
     if (!s.ok()) return s;
   }
 
-  if (new_chunks) {
+  // For downstream processing
+  if (ds_args != nullptr) {
     if (new_data_list.size()) {
-      *new_chunks = std::move(new_data_list);
+      ds_args->new_chunks = std::move(new_data_list);
     } else {
-      *new_chunks = {std::move(latest_chunk_value)};
+      ds_args->new_chunks = {std::move(latest_chunk_value)};
     }
+    ds_args->sample_batch = &sample_batch;
+    if (latest_chunk_key.empty()) ds_args->was_source_empty = true;
   }
 
   return rocksdb::Status::OK();
@@ -1102,9 +1104,10 @@ rocksdb::Status TimeSeries::rangeCommon(engine::Context 
&ctx, const Slice &ns_ke
 }
 
 rocksdb::Status TimeSeries::upsertDownStream(engine::Context &ctx, const Slice 
&ns_key,
-                                             const TimeSeriesMetadata 
&metadata,
-                                             const std::vector<std::string> 
&new_chunks, SampleBatch &sample_batch) {
+                                             const TimeSeriesMetadata 
&metadata, DownstreamUpsertArgs &ds_args) {
   // If no valid written
+  auto &new_chunks = ds_args.new_chunks;
+  auto *sample_batch = ds_args.sample_batch;
   if (new_chunks.empty()) return rocksdb::Status::OK();
   std::vector<std::string> downstream_keys;
   std::vector<TSDownStreamMeta> downstream_metas;
@@ -1112,7 +1115,7 @@ rocksdb::Status 
TimeSeries::upsertDownStream(engine::Context &ctx, const Slice &
   if (!s.ok()) return s;
   if (downstream_keys.empty()) return rocksdb::Status::OK();
 
-  auto all_batch_slice = sample_batch.AsSlice();
+  auto all_batch_slice = sample_batch->AsSlice();
   uint64_t new_chunk_first_ts = 
CreateTSChunkFromData(new_chunks[0])->GetFirstTimestamp();
 
   nonstd::span<const AddResult> add_results = 
all_batch_slice.GetAddResultSpan();
@@ -1239,18 +1242,37 @@ rocksdb::Status 
TimeSeries::upsertDownStream(engine::Context &ctx, const Slice &
     if (new_chunks.size() > 1) {
       is_meta_updates[i] = true;
     }
+
+    // Avoid incorrect aggregation of the `bucket_idx=0` bucket,
+    // when inserting a sample with `bucket_idx>0` while the source series is 
empty.
+    if (meta.latest_bucket_idx == 0 && ds_args.was_source_empty) {
+      auto chunk = CreateTSChunkFromData(new_chunks.front());
+      auto buckets = agg.SplitSamplesToBuckets(chunk->GetSamplesSpan());
+      if (buckets.size()) {
+        auto bkt_idx = agg.CalculateAlignedBucketLeft(buckets[0][0].ts);
+        if (bkt_idx > meta.latest_bucket_idx) {
+          meta.latest_bucket_idx = bkt_idx;
+          is_meta_updates[i] = true;
+        }
+      }
+    }
+
+    auto aggregate_chunk = [&](const auto &chunk, bool is_unsealed) {
+      auto buckets = agg.SplitSamplesToBuckets(chunk->GetSamplesSpan());
+      if (buckets.empty()) return;
+      auto samples = meta.AggregateMultiBuckets(buckets, is_unsealed);
+      agg_samples.insert(agg_samples.end(), samples.begin(), samples.end());
+    };
     // For chunk except the last chunk(sealed)
     for (size_t j = 0; j < new_chunks.size() - 1; j++) {
       auto chunk = CreateTSChunkFromData(new_chunks[j]);
-      auto samples = meta.AggregateMultiBuckets(chunk->GetSamplesSpan());
-      agg_samples.insert(agg_samples.end(), samples.begin(), samples.end());
+      aggregate_chunk(chunk, false /* is_unsealed = false */);
     }
     // For last chunk(unsealed)
-    auto chunk = CreateTSChunkFromData(new_chunks.back());
-    auto newest_bucket_idx = 
agg.CalculateAlignedBucketLeft(chunk->GetLastTimestamp());
+    auto last_chunk = CreateTSChunkFromData(new_chunks.back());
+    auto newest_bucket_idx = 
agg.CalculateAlignedBucketLeft(last_chunk->GetLastTimestamp());
     if (meta.latest_bucket_idx < newest_bucket_idx) {
-      auto samples = meta.AggregateMultiBuckets(chunk->GetSamplesSpan(), true);
-      agg_samples.insert(agg_samples.end(), samples.begin(), samples.end());
+      aggregate_chunk(last_chunk, true /* is_unsealed = true */);
       is_meta_updates[i] = true;
     }
   }
@@ -1786,10 +1808,10 @@ rocksdb::Status TimeSeries::Add(engine::Context &ctx, 
const Slice &user_key, TSS
   if (!s.ok()) return s;
   auto sample_batch = SampleBatch({sample}, on_dup_policy ? *on_dup_policy : 
metadata.duplicate_policy);
 
-  std::vector<std::string> new_chunks;
-  s = upsertCommon(ctx, ns_key, metadata, sample_batch, &new_chunks);
+  DownstreamUpsertArgs ds_args;
+  s = upsertCommon(ctx, ns_key, metadata, sample_batch, &ds_args);
   if (!s.ok()) return s;
-  s = upsertDownStream(ctx, ns_key, metadata, new_chunks, sample_batch);
+  s = upsertDownStream(ctx, ns_key, metadata, ds_args);
   if (!s.ok()) return s;
   *res = sample_batch.GetFinalResults()[0];
   return rocksdb::Status::OK();
@@ -1805,10 +1827,10 @@ rocksdb::Status TimeSeries::MAdd(engine::Context &ctx, 
const Slice &user_key, st
     return s;
   }
   auto sample_batch = SampleBatch(std::move(samples), 
metadata.duplicate_policy);
-  std::vector<std::string> new_chunks;
-  s = upsertCommon(ctx, ns_key, metadata, sample_batch, &new_chunks);
+  DownstreamUpsertArgs ds_args;
+  s = upsertCommon(ctx, ns_key, metadata, sample_batch, &ds_args);
   if (!s.ok()) return s;
-  s = upsertDownStream(ctx, ns_key, metadata, new_chunks, sample_batch);
+  s = upsertDownStream(ctx, ns_key, metadata, ds_args);
   if (!s.ok()) return s;
   *res = sample_batch.GetFinalResults();
   return rocksdb::Status::OK();
@@ -2083,10 +2105,10 @@ rocksdb::Status TimeSeries::IncrBy(engine::Context 
&ctx, const Slice &user_key,
   }
   auto sample_batch = SampleBatch({sample}, DuplicatePolicy::LAST);
 
-  std::vector<std::string> new_chunks;
-  s = upsertCommon(ctx, ns_key, metadata, sample_batch, &new_chunks);
+  DownstreamUpsertArgs ds_args;
+  s = upsertCommon(ctx, ns_key, metadata, sample_batch, &ds_args);
   if (!s.ok()) return s;
-  s = upsertDownStream(ctx, ns_key, metadata, new_chunks, sample_batch);
+  s = upsertDownStream(ctx, ns_key, metadata, ds_args);
   if (!s.ok()) return s;
   *res = sample_batch.GetFinalResults()[0];
   return rocksdb::Status::OK();
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 91b337be7..f895c8af8 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -104,7 +104,8 @@ struct TSDownStreamMeta {
   // Aggregate samples and update the auxiliary info and latest_bucket_idx if 
needed.
   // Returns the aggregated samples if there are new buckets.
   // Note: Samples must be sorted by timestamp.
-  std::vector<TSSample> AggregateMultiBuckets(nonstd::span<const TSSample> 
samples, bool skip_last_bucket = false);
+  std::vector<TSSample> AggregateMultiBuckets(const 
std::vector<nonstd::span<const TSSample>> &bucket_spans,
+                                              bool skip_last_bucket = false);
 
   // Aggregate the samples to the latest bucket, update the auxiliary info.
   void AggregateLatestBucket(nonstd::span<const TSSample> samples);
@@ -284,6 +285,13 @@ class TimeSeries : public SubKeyScanner {
   rocksdb::Status Del(engine::Context &ctx, const Slice &user_key, uint64_t 
from, uint64_t to, uint64_t *deleted);
 
  private:
+  // Bundles the arguments for a downstream upsert operation
+  struct DownstreamUpsertArgs {
+    std::vector<std::string> new_chunks;  // Newly created chunks
+    bool was_source_empty = false;        // Whether the source time series 
was empty before upsert
+    SampleBatch *sample_batch = nullptr;  // The sample batch that has been 
upserted to source
+  };
+
   rocksdb::ColumnFamilyHandle *index_cf_handle_;
 
   rocksdb::Status getTimeSeriesMetadata(engine::Context &ctx, const Slice 
&ns_key, TimeSeriesMetadata *metadata);
@@ -294,14 +302,14 @@ class TimeSeries : public SubKeyScanner {
   rocksdb::Status getLabelKVList(engine::Context &ctx, const Slice &ns_key, 
const TimeSeriesMetadata &metadata,
                                  LabelKVList *labels);
   rocksdb::Status upsertCommon(engine::Context &ctx, const Slice &ns_key, 
TimeSeriesMetadata &metadata,
-                               SampleBatch &sample_batch, 
std::vector<std::string> *new_chunks = nullptr);
+                               SampleBatch &sample_batch, DownstreamUpsertArgs 
*ds_args = nullptr);
   rocksdb::Status upsertCommonInBatch(engine::Context &ctx, const Slice 
&ns_key, TimeSeriesMetadata &metadata,
                                       SampleBatch &sample_batch, 
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
-                                      std::vector<std::string> *new_chunks = 
nullptr);
+                                      DownstreamUpsertArgs *ds_args = nullptr);
   rocksdb::Status rangeCommon(engine::Context &ctx, const Slice &ns_key, const 
TimeSeriesMetadata &metadata,
                               const TSRangeOption &option, 
std::vector<TSSample> *res, bool apply_retention = true);
   rocksdb::Status upsertDownStream(engine::Context &ctx, const Slice &ns_key, 
const TimeSeriesMetadata &metadata,
-                                   const std::vector<std::string> &new_chunks, 
SampleBatch &sample_batch);
+                                   DownstreamUpsertArgs &ds_args);
   rocksdb::Status getCommon(engine::Context &ctx, const Slice &ns_key, const 
TimeSeriesMetadata &metadata,
                             bool is_return_latest, std::vector<TSSample> *res);
   rocksdb::Status delRangeCommon(engine::Context &ctx, const Slice &ns_key, 
TimeSeriesMetadata &metadata, uint64_t from,
diff --git a/tests/cppunit/types/timeseries_test.cc 
b/tests/cppunit/types/timeseries_test.cc
index c448bc811..403321ea6 100644
--- a/tests/cppunit/types/timeseries_test.cc
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -468,6 +468,54 @@ TEST_F(TimeSeriesTest, CreateRuleErrorCases) {
   }
 }
 
+TEST_F(TimeSeriesTest, AggregationOnEmptySeries) {
+  redis::TSCreateOption option;
+  option.chunk_size = 3;
+  const std::string key_src = "agg_test_empty";
+  const std::string key_dst = "agg_test_empty_dst";
+
+  auto s = ts_db_->Create(*ctx_, key_src, option);
+  EXPECT_TRUE(s.ok());
+  s = ts_db_->Create(*ctx_, key_dst, option);
+  EXPECT_TRUE(s.ok());
+
+  redis::TSAggregator aggregator;
+  aggregator.type = redis::TSAggregatorType::AVG;
+  aggregator.bucket_duration = 10;
+  aggregator.alignment = 0;
+
+  redis::TSCreateRuleResult result = redis::TSCreateRuleResult::kOK;
+  s = ts_db_->CreateRule(*ctx_, key_src, key_dst, aggregator, &result);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(result, redis::TSCreateRuleResult::kOK);
+
+  // Add sample data
+  std::vector<TSSample> samples = {{11, 10}};
+  std::vector<TSChunk::AddResult> add_results(samples.size());
+  s = ts_db_->MAdd(*ctx_, key_src, samples, &add_results);
+  EXPECT_TRUE(s.ok());
+
+  // Query the destination time series
+  std::vector<TSSample> res;
+  redis::TSRangeOption range_opt;
+
+  s = ts_db_->Range(*ctx_, key_dst, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res.size(), 0);
+
+  // Add sample data
+  samples = {{101, 10}};
+  s = ts_db_->MAdd(*ctx_, key_src, samples, &add_results);
+  EXPECT_TRUE(s.ok());
+
+  // Query the destination time series
+  s = ts_db_->Range(*ctx_, key_dst, range_opt, &res);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(res.size(), 1);
+  EXPECT_EQ(res[0].ts, 10);
+  EXPECT_EQ(res[0].v, 10);
+}
+
 TEST_F(TimeSeriesTest, AggregationMultiple) {
   redis::TSCreateOption option;
   option.chunk_size = 3;

Reply via email to