This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 32d349935 fix(ts): Avoid aggregating empty downstream bucket when
writing to empty series (#3184)
32d349935 is described below
commit 32d349935cfa699f2b2522983131cbe9e725da71
Author: RX Xiao <[email protected]>
AuthorDate: Fri Sep 19 01:11:32 2025 +0800
fix(ts): Avoid aggregating empty downstream bucket when writing to empty
series (#3184)
An edge case existed (due to the handling of [`latest_bucket_idx` in the
downstream
meta](https://github.com/apache/kvrocks/blob/4025a6d23698b6bd5383511f259d10988fe7dc0e/src/types/redis_timeseries.h#L91-L98))
where inserting data into an empty series with the first `bucket index >
0` would incorrectly aggregate an empty or invalid bucket to the
downstream series. The insertion logic has been corrected to fix this
bug.
- Add a `DownstreamUpsertArgs` struct to contain a flag that indicates
this edge case.
- Correctly update `latest_bucket_idx` in the downstream meta.
---------
Co-authored-by: Twice <[email protected]>
---
src/types/redis_timeseries.cc | 82 +++++++++++++++++++++-------------
src/types/redis_timeseries.h | 16 +++++--
tests/cppunit/types/timeseries_test.cc | 48 ++++++++++++++++++++
3 files changed, 112 insertions(+), 34 deletions(-)
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 3ae4873ac..4f0e495d3 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -245,10 +245,9 @@ std::vector<TSSample> GroupSamplesAndReduce(const
std::vector<std::vector<TSSamp
return result;
}
-std::vector<TSSample>
TSDownStreamMeta::AggregateMultiBuckets(nonstd::span<const TSSample> samples,
- bool
skip_last_bucket) {
+std::vector<TSSample> TSDownStreamMeta::AggregateMultiBuckets(
+ const std::vector<nonstd::span<const TSSample>> &bucket_spans, bool
skip_last_bucket) {
std::vector<TSSample> res;
- auto bucket_spans = aggregator.SplitSamplesToBuckets(samples);
for (size_t i = 0; i < bucket_spans.size(); i++) {
const auto &span = bucket_spans[i];
if (span.empty()) {
@@ -853,9 +852,9 @@ rocksdb::Status
TimeSeries::getOrCreateTimeSeries(engine::Context &ctx, const Sl
}
rocksdb::Status TimeSeries::upsertCommon(engine::Context &ctx, const Slice
&ns_key, TimeSeriesMetadata &metadata,
- SampleBatch &sample_batch,
std::vector<std::string> *new_chunks) {
+ SampleBatch &sample_batch,
DownstreamUpsertArgs *ds_args) {
auto batch = storage_->GetWriteBatchBase();
- auto s = upsertCommonInBatch(ctx, ns_key, metadata, sample_batch, batch,
new_chunks);
+ auto s = upsertCommonInBatch(ctx, ns_key, metadata, sample_batch, batch,
ds_args);
if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
@@ -863,11 +862,11 @@ rocksdb::Status TimeSeries::upsertCommon(engine::Context
&ctx, const Slice &ns_k
rocksdb::Status TimeSeries::upsertCommonInBatch(engine::Context &ctx, const
Slice &ns_key, TimeSeriesMetadata &metadata,
SampleBatch &sample_batch,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
- std::vector<std::string>
*new_chunks) {
- auto all_batch_slice = sample_batch.AsSlice();
+ DownstreamUpsertArgs *ds_args)
{
+ if (ds_args != nullptr) ds_args->new_chunks.clear();
- if (all_batch_slice.GetSampleSpan().empty() && new_chunks != nullptr) {
- new_chunks->clear();
+ auto all_batch_slice = sample_batch.AsSlice();
+ if (all_batch_slice.GetSampleSpan().empty()) {
return rocksdb::Status::OK();
}
@@ -983,12 +982,15 @@ rocksdb::Status
TimeSeries::upsertCommonInBatch(engine::Context &ctx, const Slic
if (!s.ok()) return s;
}
- if (new_chunks) {
+ // For downstream processing
+ if (ds_args != nullptr) {
if (new_data_list.size()) {
- *new_chunks = std::move(new_data_list);
+ ds_args->new_chunks = std::move(new_data_list);
} else {
- *new_chunks = {std::move(latest_chunk_value)};
+ ds_args->new_chunks = {std::move(latest_chunk_value)};
}
+ ds_args->sample_batch = &sample_batch;
+ if (latest_chunk_key.empty()) ds_args->was_source_empty = true;
}
return rocksdb::Status::OK();
@@ -1102,9 +1104,10 @@ rocksdb::Status TimeSeries::rangeCommon(engine::Context
&ctx, const Slice &ns_ke
}
rocksdb::Status TimeSeries::upsertDownStream(engine::Context &ctx, const Slice
&ns_key,
- const TimeSeriesMetadata
&metadata,
- const std::vector<std::string>
&new_chunks, SampleBatch &sample_batch) {
+ const TimeSeriesMetadata
&metadata, DownstreamUpsertArgs &ds_args) {
// If no valid written
+ auto &new_chunks = ds_args.new_chunks;
+ auto *sample_batch = ds_args.sample_batch;
if (new_chunks.empty()) return rocksdb::Status::OK();
std::vector<std::string> downstream_keys;
std::vector<TSDownStreamMeta> downstream_metas;
@@ -1112,7 +1115,7 @@ rocksdb::Status
TimeSeries::upsertDownStream(engine::Context &ctx, const Slice &
if (!s.ok()) return s;
if (downstream_keys.empty()) return rocksdb::Status::OK();
- auto all_batch_slice = sample_batch.AsSlice();
+ auto all_batch_slice = sample_batch->AsSlice();
uint64_t new_chunk_first_ts =
CreateTSChunkFromData(new_chunks[0])->GetFirstTimestamp();
nonstd::span<const AddResult> add_results =
all_batch_slice.GetAddResultSpan();
@@ -1239,18 +1242,37 @@ rocksdb::Status
TimeSeries::upsertDownStream(engine::Context &ctx, const Slice &
if (new_chunks.size() > 1) {
is_meta_updates[i] = true;
}
+
+ // Avoid incorrect aggregation of the `bucket_idx=0` bucket,
+ // when inserting a sample with `bucket_idx>0` while the source series is
empty.
+ if (meta.latest_bucket_idx == 0 && ds_args.was_source_empty) {
+ auto chunk = CreateTSChunkFromData(new_chunks.front());
+ auto buckets = agg.SplitSamplesToBuckets(chunk->GetSamplesSpan());
+ if (buckets.size()) {
+ auto bkt_idx = agg.CalculateAlignedBucketLeft(buckets[0][0].ts);
+ if (bkt_idx > meta.latest_bucket_idx) {
+ meta.latest_bucket_idx = bkt_idx;
+ is_meta_updates[i] = true;
+ }
+ }
+ }
+
+ auto aggregate_chunk = [&](const auto &chunk, bool is_unsealed) {
+ auto buckets = agg.SplitSamplesToBuckets(chunk->GetSamplesSpan());
+ if (buckets.empty()) return;
+ auto samples = meta.AggregateMultiBuckets(buckets, is_unsealed);
+ agg_samples.insert(agg_samples.end(), samples.begin(), samples.end());
+ };
// For chunk except the last chunk(sealed)
for (size_t j = 0; j < new_chunks.size() - 1; j++) {
auto chunk = CreateTSChunkFromData(new_chunks[j]);
- auto samples = meta.AggregateMultiBuckets(chunk->GetSamplesSpan());
- agg_samples.insert(agg_samples.end(), samples.begin(), samples.end());
+ aggregate_chunk(chunk, false /* is_unsealed = false */);
}
// For last chunk(unsealed)
- auto chunk = CreateTSChunkFromData(new_chunks.back());
- auto newest_bucket_idx =
agg.CalculateAlignedBucketLeft(chunk->GetLastTimestamp());
+ auto last_chunk = CreateTSChunkFromData(new_chunks.back());
+ auto newest_bucket_idx =
agg.CalculateAlignedBucketLeft(last_chunk->GetLastTimestamp());
if (meta.latest_bucket_idx < newest_bucket_idx) {
- auto samples = meta.AggregateMultiBuckets(chunk->GetSamplesSpan(), true);
- agg_samples.insert(agg_samples.end(), samples.begin(), samples.end());
+ aggregate_chunk(last_chunk, true /* is_unsealed = true */);
is_meta_updates[i] = true;
}
}
@@ -1786,10 +1808,10 @@ rocksdb::Status TimeSeries::Add(engine::Context &ctx,
const Slice &user_key, TSS
if (!s.ok()) return s;
auto sample_batch = SampleBatch({sample}, on_dup_policy ? *on_dup_policy :
metadata.duplicate_policy);
- std::vector<std::string> new_chunks;
- s = upsertCommon(ctx, ns_key, metadata, sample_batch, &new_chunks);
+ DownstreamUpsertArgs ds_args;
+ s = upsertCommon(ctx, ns_key, metadata, sample_batch, &ds_args);
if (!s.ok()) return s;
- s = upsertDownStream(ctx, ns_key, metadata, new_chunks, sample_batch);
+ s = upsertDownStream(ctx, ns_key, metadata, ds_args);
if (!s.ok()) return s;
*res = sample_batch.GetFinalResults()[0];
return rocksdb::Status::OK();
@@ -1805,10 +1827,10 @@ rocksdb::Status TimeSeries::MAdd(engine::Context &ctx,
const Slice &user_key, st
return s;
}
auto sample_batch = SampleBatch(std::move(samples),
metadata.duplicate_policy);
- std::vector<std::string> new_chunks;
- s = upsertCommon(ctx, ns_key, metadata, sample_batch, &new_chunks);
+ DownstreamUpsertArgs ds_args;
+ s = upsertCommon(ctx, ns_key, metadata, sample_batch, &ds_args);
if (!s.ok()) return s;
- s = upsertDownStream(ctx, ns_key, metadata, new_chunks, sample_batch);
+ s = upsertDownStream(ctx, ns_key, metadata, ds_args);
if (!s.ok()) return s;
*res = sample_batch.GetFinalResults();
return rocksdb::Status::OK();
@@ -2083,10 +2105,10 @@ rocksdb::Status TimeSeries::IncrBy(engine::Context
&ctx, const Slice &user_key,
}
auto sample_batch = SampleBatch({sample}, DuplicatePolicy::LAST);
- std::vector<std::string> new_chunks;
- s = upsertCommon(ctx, ns_key, metadata, sample_batch, &new_chunks);
+ DownstreamUpsertArgs ds_args;
+ s = upsertCommon(ctx, ns_key, metadata, sample_batch, &ds_args);
if (!s.ok()) return s;
- s = upsertDownStream(ctx, ns_key, metadata, new_chunks, sample_batch);
+ s = upsertDownStream(ctx, ns_key, metadata, ds_args);
if (!s.ok()) return s;
*res = sample_batch.GetFinalResults()[0];
return rocksdb::Status::OK();
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 91b337be7..f895c8af8 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -104,7 +104,8 @@ struct TSDownStreamMeta {
// Aggregate samples and update the auxiliary info and latest_bucket_idx if
needed.
// Returns the aggregated samples if there are new buckets.
// Note: Samples must be sorted by timestamp.
- std::vector<TSSample> AggregateMultiBuckets(nonstd::span<const TSSample>
samples, bool skip_last_bucket = false);
+ std::vector<TSSample> AggregateMultiBuckets(const
std::vector<nonstd::span<const TSSample>> &bucket_spans,
+ bool skip_last_bucket = false);
// Aggregate the samples to the latest bucket, update the auxiliary info.
void AggregateLatestBucket(nonstd::span<const TSSample> samples);
@@ -284,6 +285,13 @@ class TimeSeries : public SubKeyScanner {
rocksdb::Status Del(engine::Context &ctx, const Slice &user_key, uint64_t
from, uint64_t to, uint64_t *deleted);
private:
+ // Bundles the arguments for a downstream upsert operation
+ struct DownstreamUpsertArgs {
+ std::vector<std::string> new_chunks; // Newly created chunks
+ bool was_source_empty = false; // Whether the source time series
was empty before upsert
+ SampleBatch *sample_batch = nullptr; // The sample batch that has been
upserted to source
+ };
+
rocksdb::ColumnFamilyHandle *index_cf_handle_;
rocksdb::Status getTimeSeriesMetadata(engine::Context &ctx, const Slice
&ns_key, TimeSeriesMetadata *metadata);
@@ -294,14 +302,14 @@ class TimeSeries : public SubKeyScanner {
rocksdb::Status getLabelKVList(engine::Context &ctx, const Slice &ns_key,
const TimeSeriesMetadata &metadata,
LabelKVList *labels);
rocksdb::Status upsertCommon(engine::Context &ctx, const Slice &ns_key,
TimeSeriesMetadata &metadata,
- SampleBatch &sample_batch,
std::vector<std::string> *new_chunks = nullptr);
+ SampleBatch &sample_batch, DownstreamUpsertArgs
*ds_args = nullptr);
rocksdb::Status upsertCommonInBatch(engine::Context &ctx, const Slice
&ns_key, TimeSeriesMetadata &metadata,
SampleBatch &sample_batch,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
- std::vector<std::string> *new_chunks =
nullptr);
+ DownstreamUpsertArgs *ds_args = nullptr);
rocksdb::Status rangeCommon(engine::Context &ctx, const Slice &ns_key, const
TimeSeriesMetadata &metadata,
const TSRangeOption &option,
std::vector<TSSample> *res, bool apply_retention = true);
rocksdb::Status upsertDownStream(engine::Context &ctx, const Slice &ns_key,
const TimeSeriesMetadata &metadata,
- const std::vector<std::string> &new_chunks,
SampleBatch &sample_batch);
+ DownstreamUpsertArgs &ds_args);
rocksdb::Status getCommon(engine::Context &ctx, const Slice &ns_key, const
TimeSeriesMetadata &metadata,
bool is_return_latest, std::vector<TSSample> *res);
rocksdb::Status delRangeCommon(engine::Context &ctx, const Slice &ns_key,
TimeSeriesMetadata &metadata, uint64_t from,
diff --git a/tests/cppunit/types/timeseries_test.cc
b/tests/cppunit/types/timeseries_test.cc
index c448bc811..403321ea6 100644
--- a/tests/cppunit/types/timeseries_test.cc
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -468,6 +468,54 @@ TEST_F(TimeSeriesTest, CreateRuleErrorCases) {
}
}
+TEST_F(TimeSeriesTest, AggregationOnEmptySeries) {
+ redis::TSCreateOption option;
+ option.chunk_size = 3;
+ const std::string key_src = "agg_test_empty";
+ const std::string key_dst = "agg_test_empty_dst";
+
+ auto s = ts_db_->Create(*ctx_, key_src, option);
+ EXPECT_TRUE(s.ok());
+ s = ts_db_->Create(*ctx_, key_dst, option);
+ EXPECT_TRUE(s.ok());
+
+ redis::TSAggregator aggregator;
+ aggregator.type = redis::TSAggregatorType::AVG;
+ aggregator.bucket_duration = 10;
+ aggregator.alignment = 0;
+
+ redis::TSCreateRuleResult result = redis::TSCreateRuleResult::kOK;
+ s = ts_db_->CreateRule(*ctx_, key_src, key_dst, aggregator, &result);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(result, redis::TSCreateRuleResult::kOK);
+
+ // Add sample data
+ std::vector<TSSample> samples = {{11, 10}};
+ std::vector<TSChunk::AddResult> add_results(samples.size());
+ s = ts_db_->MAdd(*ctx_, key_src, samples, &add_results);
+ EXPECT_TRUE(s.ok());
+
+ // Query the destination time series
+ std::vector<TSSample> res;
+ redis::TSRangeOption range_opt;
+
+ s = ts_db_->Range(*ctx_, key_dst, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res.size(), 0);
+
+ // Add sample data
+ samples = {{101, 10}};
+ s = ts_db_->MAdd(*ctx_, key_src, samples, &add_results);
+ EXPECT_TRUE(s.ok());
+
+ // Query the destination time series
+ s = ts_db_->Range(*ctx_, key_dst, range_opt, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res.size(), 1);
+ EXPECT_EQ(res[0].ts, 10);
+ EXPECT_EQ(res[0].v, 10);
+}
+
TEST_F(TimeSeriesTest, AggregationMultiple) {
redis::TSCreateOption option;
option.chunk_size = 3;