This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 2dcbbf777 fix(ts): Enhance retrieval of source and target series in
compaction rule (#3201)
2dcbbf777 is described below
commit 2dcbbf777264405719850c5f2cc8ad4349296563
Author: RX Xiao <[email protected]>
AuthorDate: Sun Sep 28 19:25:18 2025 +0800
fix(ts): Enhance retrieval of source and target series in compaction rule
(#3201)
Previously, for a TS key with a compaction rule, recreating a key with
the same name would lead to the incorrect retrieval of its source and
target series. This PR fixes the issue, ensuring that
- `TS.INFO` correctly displays the `rules` and `sourceKey` fields
- `TS.CREATERULE` properly create rules and handle error cases
---
src/commands/cmd_timeseries.cc | 7 +-
src/types/redis_timeseries.cc | 153 ++++++++++++++++++++++++---------
src/types/redis_timeseries.h | 17 ++--
tests/cppunit/types/timeseries_test.cc | 59 +++++++++++++
4 files changed, 186 insertions(+), 50 deletions(-)
diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index 9912281f6..add0113c0 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -379,10 +379,9 @@ class CommandTSInfo : public Commander {
std::vector<std::string> rules_str;
rules_str.reserve(info.downstream_rules.size());
for (const auto &[key, rule] : info.downstream_rules) {
- const auto &aggregator = rule.aggregator;
- auto str = redis::Array({redis::BulkString(key),
redis::Integer(aggregator.bucket_duration),
-
redis::SimpleString(FormatAggregatorTypeAsRedisReply(aggregator.type)),
- redis::Integer(aggregator.alignment)});
+ auto str = redis::Array({redis::BulkString(key),
redis::Integer(rule.bucket_duration),
+
redis::SimpleString(FormatAggregatorTypeAsRedisReply(rule.type)),
+ redis::Integer(rule.alignment)});
rules_str.push_back(str);
}
*output += redis::Array(rules_str);
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 34ec769f4..e5f5d1188 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -1117,7 +1117,8 @@ rocksdb::Status
TimeSeries::upsertDownStream(engine::Context &ctx, const Slice &
if (new_chunks.empty()) return rocksdb::Status::OK();
std::vector<std::string> downstream_keys;
std::vector<TSDownStreamMeta> downstream_metas;
- auto s = getDownStreamRules(ctx, ns_key, metadata, &downstream_keys,
&downstream_metas);
+ std::vector<TimeSeriesMetadata> downstream_series_meta;
+ auto s = getDownStreamRules(ctx, ns_key, metadata, &downstream_keys,
&downstream_metas, &downstream_series_meta);
if (!s.ok()) return s;
if (downstream_keys.empty()) return rocksdb::Status::OK();
@@ -1294,27 +1295,24 @@ rocksdb::Status
TimeSeries::upsertDownStream(engine::Context &ctx, const Slice &
continue;
}
const auto &meta = downstream_metas[i];
- const auto &key = downstream_keys[i];
+ auto ds_key = internalKeyFromDownstreamKey(ns_key, metadata,
downstream_keys[i]);
std::string bytes;
meta.Encode(&bytes);
- s = batch->Put(key, bytes);
+ s = batch->Put(ds_key, bytes);
if (!s.ok()) return s;
}
// Write aggregated samples
for (size_t i = 0; i < downstream_metas.size(); i++) {
- const auto &ds_key = downstream_keys[i];
- auto key = downstreamKeyFromInternalKey(ds_key);
+ const auto &key = downstream_keys[i];
auto ns_key = AppendNamespacePrefix(key);
+ auto &metadata = downstream_series_meta[i];
+
auto &agg_samples = all_agg_samples[i];
auto &agg_samples_inc = all_agg_samples_inc[i];
if (agg_samples.empty() && agg_samples_inc.empty()) {
continue;
}
- TimeSeriesMetadata metadata;
- s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
- if (!s.ok()) return s;
-
if (agg_samples.size()) {
auto sample_batch_t = SampleBatch(std::move(agg_samples),
DuplicatePolicy::LAST);
s = upsertCommon(ctx, ns_key, metadata, sample_batch_t);
@@ -1465,9 +1463,11 @@ rocksdb::Status
TimeSeries::delRangeCommonInBatch(engine::Context &ctx, const Sl
}
rocksdb::Status TimeSeries::delRangeDownStream(engine::Context &ctx, const
Slice &ns_key, TimeSeriesMetadata &metadata,
- std::vector<std::string>
&ds_keys,
- std::vector<TSDownStreamMeta>
&ds_metas, uint64_t from, uint64_t to) {
- if (from > to || ds_keys.empty()) return rocksdb::Status::OK();
+ std::vector<std::string>
&ds_user_keys,
+ std::vector<TSDownStreamMeta>
&ds_metas,
+ std::vector<TimeSeriesMetadata>
&ds_series_metas, uint64_t from,
+ uint64_t to) {
+ if (from > to || ds_user_keys.empty()) return rocksdb::Status::OK();
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisTimeSeries);
@@ -1521,14 +1521,11 @@ rocksdb::Status
TimeSeries::delRangeDownStream(engine::Context &ctx, const Slice
}
// Process each downstream rule
- for (size_t i = 0; i < ds_keys.size(); i++) {
+ for (size_t i = 0; i < ds_user_keys.size(); i++) {
+ auto ds_ns_key = AppendNamespacePrefix(ds_user_keys[i]);
auto &ds_meta = ds_metas[i];
auto &agg = ds_meta.aggregator;
-
- TimeSeriesMetadata meta;
- auto ds_ns_key =
AppendNamespacePrefix(downstreamKeyFromInternalKey(ds_keys[i]));
- s = getTimeSeriesMetadata(ctx, ds_ns_key, &meta);
- if (!s.ok()) return s;
+ auto &meta = ds_series_metas[i];
// Calculate the range of buckets affected by this deletion.
uint64_t start_bucket = agg.CalculateAlignedBucketLeft(from);
@@ -1587,7 +1584,9 @@ rocksdb::Status
TimeSeries::delRangeDownStream(engine::Context &ctx, const Slice
// Persist downstream metadata updates if needed
std::string bytes;
ds_meta.Encode(&bytes);
- batch->Put(ds_keys[i], bytes);
+ auto ds_key = internalKeyFromDownstreamKey(ns_key, metadata,
ds_user_keys[i]);
+ s = batch->Put(ds_key, bytes);
+ if (!s.ok()) return s;
}
return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
@@ -1652,25 +1651,47 @@ rocksdb::Status
TimeSeries::createDownStreamMetadataInBatch(engine::Context &ctx
}
rocksdb::Status TimeSeries::getDownStreamRules(engine::Context &ctx, const
Slice &ns_src_key,
- const TimeSeriesMetadata
&src_metadata, std::vector<std::string> *keys,
- std::vector<TSDownStreamMeta>
*metas) {
+ const TimeSeriesMetadata
&src_metadata,
+ std::vector<std::string>
*ds_user_keys,
+ std::vector<TSDownStreamMeta>
*ds_metas,
+ std::vector<TimeSeriesMetadata>
*ds_series_metadatas) {
std::string prefix = internalKeyFromDownstreamKey(ns_src_key, src_metadata,
"");
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice lower_bound(prefix);
read_options.iterate_lower_bound = &lower_bound;
auto iter = util::UniqueIterator(ctx, read_options);
- keys->clear();
- if (metas != nullptr) {
- metas->clear();
+ ds_user_keys->clear();
+ if (ds_metas != nullptr) {
+ ds_metas->clear();
+ }
+ if (ds_series_metadatas != nullptr) {
+ ds_series_metadatas->clear();
}
for (iter->Seek(lower_bound); iter->Valid() &&
iter->key().starts_with(prefix); iter->Next()) {
- keys->push_back(iter->key().ToString());
- if (metas != nullptr) {
+ auto key = downstreamKeyFromInternalKey(iter->key());
+ auto ns_key = AppendNamespacePrefix(key);
+ TimeSeriesMetadata metadata;
+ // Check if the downstream series exists
+ auto s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
+ if (!s.ok()) {
+ if (s.IsNotFound()) continue;
+ return s;
+ }
+ auto [ns, src_key] = ExtractNamespaceKey(ns_src_key,
storage_->IsSlotIdEncoded());
+ if (metadata.source_key != src_key) {
+ // Inconsistent source key, skip this rule
+ continue;
+ }
+ ds_user_keys->push_back(key);
+ if (ds_metas != nullptr) {
TSDownStreamMeta meta;
Slice slice = iter->value().ToStringView();
meta.Decode(&slice);
- metas->push_back(meta);
+ ds_metas->push_back(meta);
+ }
+ if (ds_series_metadatas != nullptr) {
+ ds_series_metadatas->push_back(metadata);
}
}
return rocksdb::Status::OK();
@@ -1756,6 +1777,27 @@ rocksdb::Status
TimeSeries::getTSKeyByFilter(engine::Context &ctx, const TSMGetO
return rocksdb::Status::OK();
}
+rocksdb::Status TimeSeries::checkTSMetadataSourceExists(engine::Context &ctx,
const TimeSeriesMetadata &metadata,
+ bool &exists,
TimeSeriesMetadata *src_metadata) {
+ exists = false;
+ if (metadata.source_key.empty()) {
+ return rocksdb::Status::OK();
+ }
+ auto ns_key = AppendNamespacePrefix(metadata.source_key);
+ TimeSeriesMetadata source_metadata;
+ auto s = getTimeSeriesMetadata(ctx, ns_key, &source_metadata);
+ if (s.ok()) {
+ exists = true;
+ if (src_metadata != nullptr) {
+ *src_metadata = source_metadata;
+ }
+ } else if (s.IsNotFound()) {
+ exists = false;
+ return rocksdb::Status::OK();
+ }
+ return s;
+}
+
std::string TimeSeries::internalKeyFromChunkID(const Slice &ns_key, const
TimeSeriesMetadata &metadata,
uint64_t id) const {
std::string sub_key;
@@ -1864,6 +1906,22 @@ rocksdb::Status TimeSeries::Info(engine::Context &ctx,
const Slice &user_key, TS
}
auto chunk_count = res->metadata.size;
auto &metadata = res->metadata;
+ // Check source key is exist
+ bool source_exists = false;
+ TimeSeriesMetadata source_metadata;
+ s = checkTSMetadataSourceExists(ctx, res->metadata, source_exists,
&source_metadata);
+ if (!s.ok()) return s;
+ if (!source_exists) {
+ res->metadata.source_key.clear();
+ } else {
+ // Check source key has the rule
+ std::vector<std::string> ds_user_keys;
+ s = getDownStreamRules(ctx,
AppendNamespacePrefix(res->metadata.source_key), source_metadata,
&ds_user_keys);
+ if (!s.ok()) return s;
+ if (std::find(ds_user_keys.begin(), ds_user_keys.end(),
user_key.ToStringView()) == ds_user_keys.end()) {
+ res->metadata.source_key.clear();
+ }
+ }
// Approximate total samples
res->total_samples = chunk_count * res->metadata.chunk_size;
// TODO: Estimate disk usage for the field `memoryUsage`
@@ -1899,13 +1957,15 @@ rocksdb::Status TimeSeries::Info(engine::Context &ctx,
const Slice &user_key, TS
}
getLabelKVList(ctx, ns_key, metadata, &res->labels);
- // Retrieve downstream downstream_rules
+ // Retrieve downstream rules
std::vector<std::string> downstream_keys;
std::vector<TSDownStreamMeta> downstream_rules;
- getDownStreamRules(ctx, ns_key, metadata, &downstream_keys,
&downstream_rules);
+ s = getDownStreamRules(ctx, ns_key, metadata, &downstream_keys,
&downstream_rules);
+ if (!s.ok()) return s;
+ res->downstream_rules.clear();
+ res->downstream_rules.reserve(downstream_keys.size());
for (size_t i = 0; i < downstream_keys.size(); i++) {
- auto key = downstreamKeyFromInternalKey(downstream_keys[i]);
- res->downstream_rules.emplace_back(std::move(key),
std::move(downstream_rules[i]));
+ res->downstream_rules.emplace_back(std::move(downstream_keys[i]),
downstream_rules[i].aggregator);
}
return rocksdb::Status::OK();
@@ -1959,14 +2019,23 @@ rocksdb::Status TimeSeries::CreateRule(engine::Context
&ctx, const Slice &src_ke
return rocksdb::Status::OK();
}
- if (src_metadata.source_key.size()) {
+ // Check source has no source rule
+ bool exists = false;
+ s = checkTSMetadataSourceExists(ctx, src_metadata, exists);
+ if (!s.ok()) return s;
+ if (exists) {
*res = TSCreateRuleResult::kSrcHasSourceRule;
return rocksdb::Status::OK();
}
- if (dst_metadata.source_key.size()) {
+ // Check destination key has no source rule
+ exists = false;
+ s = checkTSMetadataSourceExists(ctx, dst_metadata, exists);
+ if (!s.ok()) return s;
+ if (exists && dst_metadata.source_key != src_key) {
*res = TSCreateRuleResult::kDstHasSourceRule;
return rocksdb::Status::OK();
}
+ // Check destination key has no destination rule
std::vector<std::string> dst_ds_keys;
s = getDownStreamRules(ctx, ns_dst_key, dst_metadata, &dst_ds_keys);
if (!s.ok()) return s;
@@ -1984,12 +2053,13 @@ rocksdb::Status TimeSeries::CreateRule(engine::Context
&ctx, const Slice &src_ke
TSDownStreamMeta downstream_metadata;
s = createDownStreamMetadataInBatch(ctx, ns_src_key, dst_key, src_metadata,
aggregator, batch, &downstream_metadata);
if (!s.ok()) return s;
- dst_metadata.SetSourceKey(src_key);
-
- std::string bytes;
- dst_metadata.Encode(&bytes);
- s = batch->Put(metadata_cf_handle_, ns_dst_key, bytes);
- if (!s.ok()) return s;
+ if (dst_metadata.source_key.empty() || dst_metadata.source_key != src_key) {
+ dst_metadata.SetSourceKey(src_key);
+ std::string bytes;
+ dst_metadata.Encode(&bytes);
+ s = batch->Put(metadata_cf_handle_, ns_dst_key, bytes);
+ if (!s.ok()) return s;
+ }
*res = TSCreateRuleResult::kOK;
return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
@@ -2143,7 +2213,8 @@ rocksdb::Status TimeSeries::Del(engine::Context &ctx,
const Slice &user_key, uin
// Get downstream rules
std::vector<std::string> ds_keys;
std::vector<TSDownStreamMeta> ds_metas;
- s = getDownStreamRules(ctx, ns_key, metadata, &ds_keys, &ds_metas);
+ std::vector<TimeSeriesMetadata> ds_series_metas;
+ s = getDownStreamRules(ctx, ns_key, metadata, &ds_keys, &ds_metas,
&ds_series_metas);
if (!s.ok()) return s;
// Check retention and compaction rules
@@ -2166,7 +2237,7 @@ rocksdb::Status TimeSeries::Del(engine::Context &ctx,
const Slice &user_key, uin
s = delRangeCommon(ctx, ns_key, metadata, from, to, deleted);
if (!s.ok()) return s;
if (*deleted == 0) return rocksdb::Status::OK();
- s = delRangeDownStream(ctx, ns_key, metadata, ds_keys, ds_metas, from, to);
+ s = delRangeDownStream(ctx, ns_key, metadata, ds_keys, ds_metas,
ds_series_metas, from, to);
return s;
}
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index e0e5e5cb0..8520366e8 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -163,7 +163,7 @@ struct TSInfoResult {
uint64_t memory_usage;
uint64_t first_timestamp;
uint64_t last_timestamp;
- std::vector<std::pair<std::string, TSDownStreamMeta>> downstream_rules;
+ std::vector<std::pair<std::string, TSAggregator>> downstream_rules;
LabelKVList labels;
};
@@ -320,8 +320,8 @@ class TimeSeries : public SubKeyScanner {
uint64_t from, uint64_t to,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
uint64_t *deleted, bool inclusive_to =
true);
rocksdb::Status delRangeDownStream(engine::Context &ctx, const Slice
&ns_key, TimeSeriesMetadata &metadata,
- std::vector<std::string> &ds_keys,
std::vector<TSDownStreamMeta> &ds_metas,
- uint64_t from, uint64_t to);
+ std::vector<std::string> &ds_user_keys,
std::vector<TSDownStreamMeta> &ds_metas,
+ std::vector<TimeSeriesMetadata>
&ds_series_metas, uint64_t from, uint64_t to);
rocksdb::Status createLabelIndexInBatch(const Slice &ns_key, const
TimeSeriesMetadata &metadata,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
const LabelKVList &labels);
@@ -330,12 +330,19 @@ class TimeSeries : public SubKeyScanner {
const TSAggregator
&aggregator,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
TSDownStreamMeta
*ds_metadata);
+ // Get downstream rules of the source time series.
+ // - `ds_user_keys`: the user keys of the downstream time series.
+ // - `ds_metas`: (optional) the downstream rule meta.
+ // - `ds_series_metadatas`: (optional) the metadata of the downstream time
series.
rocksdb::Status getDownStreamRules(engine::Context &ctx, const Slice
&ns_src_key,
- const TimeSeriesMetadata &src_metadata,
std::vector<std::string> *keys,
- std::vector<TSDownStreamMeta> *metas =
nullptr);
+ const TimeSeriesMetadata &src_metadata,
std::vector<std::string> *ds_user_keys,
+ std::vector<TSDownStreamMeta> *ds_metas =
nullptr,
+ std::vector<TimeSeriesMetadata>
*ds_series_metadatas = nullptr);
rocksdb::Status getTSKeyByFilter(engine::Context &ctx, const
TSMGetOption::FilterOption &filter,
std::vector<std::string> *user_keys,
std::vector<LabelKVList> *labels_vec = nullptr,
std::vector<TimeSeriesMetadata> *metas =
nullptr);
+ rocksdb::Status checkTSMetadataSourceExists(engine::Context &ctx, const
TimeSeriesMetadata &metadata, bool &exists,
+ TimeSeriesMetadata *src_metadata
= nullptr);
std::string internalKeyFromChunkID(const Slice &ns_key, const
TimeSeriesMetadata &metadata, uint64_t id) const;
std::string internalKeyFromLabelKey(const Slice &ns_key, const
TimeSeriesMetadata &metadata, Slice label_key) const;
diff --git a/tests/cppunit/types/timeseries_test.cc
b/tests/cppunit/types/timeseries_test.cc
index 837e8d0ea..2d341b51a 100644
--- a/tests/cppunit/types/timeseries_test.cc
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -468,6 +468,65 @@ TEST_F(TimeSeriesTest, CreateRuleErrorCases) {
}
}
+TEST_F(TimeSeriesTest, CreateRuleDynamicCases) {
+ redis::TSCreateOption option;
+ std::string src_key = "createrule_dynamic_case_src";
+ std::string dst_key1 = "createrule_dynamic_case_dst1";
+ std::string dst_key2 = "createrule_dynamic_case_dst2";
+
+ // Create source and destination keys
+ EXPECT_TRUE(ts_db_->Create(*ctx_, src_key, option).ok());
+ EXPECT_TRUE(ts_db_->Create(*ctx_, dst_key1, option).ok());
+ EXPECT_TRUE(ts_db_->Create(*ctx_, dst_key2, option).ok());
+
+ // Create a rule
+ redis::TSCreateRuleResult res = redis::TSCreateRuleResult::kOK;
+ redis::TSAggregator aggregator;
+ aggregator.type = redis::TSAggregatorType::AVG;
+ aggregator.bucket_duration = 1000;
+ EXPECT_TRUE(ts_db_->CreateRule(*ctx_, src_key, dst_key1, aggregator,
&res).ok());
+ EXPECT_EQ(res, redis::TSCreateRuleResult::kOK);
+ EXPECT_TRUE(ts_db_->CreateRule(*ctx_, src_key, dst_key2, aggregator,
&res).ok());
+ EXPECT_EQ(res, redis::TSCreateRuleResult::kOK);
+
+ // Delete a destination key
+ EXPECT_TRUE(static_cast<redis::Database *>(ts_db_.get())->Del(*ctx_,
dst_key1).ok());
+ redis::TSInfoResult info;
+ // Check downstream rule is updated
+ EXPECT_TRUE(ts_db_->Info(*ctx_, src_key, &info).ok());
+ EXPECT_EQ(info.downstream_rules.size(), 1);
+ EXPECT_EQ(info.downstream_rules[0].first, dst_key2);
+
+ // Delete the source key
+ EXPECT_TRUE(static_cast<redis::Database *>(ts_db_.get())->Del(*ctx_,
src_key).ok());
+ // Check the destination key no longer has source rules
+ EXPECT_TRUE(ts_db_->Info(*ctx_, dst_key2, &info).ok());
+ EXPECT_TRUE(info.metadata.source_key.empty());
+
+ // Recreate the source key and add a new rule to the existing destination key
+ EXPECT_TRUE(ts_db_->Create(*ctx_, src_key, option).ok());
+ EXPECT_TRUE(ts_db_->Info(*ctx_, src_key, &info).ok());
+ EXPECT_EQ(info.downstream_rules.size(), 0);
+ EXPECT_TRUE(ts_db_->Info(*ctx_, dst_key2, &info).ok());
+ EXPECT_TRUE(info.metadata.source_key.empty());
+ // Add rule to dst_key2
+ EXPECT_TRUE(ts_db_->CreateRule(*ctx_, src_key, dst_key2, aggregator,
&res).ok());
+ EXPECT_EQ(res, redis::TSCreateRuleResult::kOK);
+ // Check if updated correctly
+ EXPECT_TRUE(ts_db_->Info(*ctx_, dst_key2, &info).ok());
+ EXPECT_EQ(info.metadata.source_key, src_key);
+ EXPECT_TRUE(ts_db_->Info(*ctx_, src_key, &info).ok());
+ EXPECT_EQ(info.downstream_rules.size(), 1);
+ EXPECT_EQ(info.downstream_rules[0].first, dst_key2);
+
+ // Delete and recreate the destination key
+ EXPECT_TRUE(static_cast<redis::Database *>(ts_db_.get())->Del(*ctx_,
dst_key2).ok());
+ EXPECT_TRUE(ts_db_->Create(*ctx_, dst_key2, option).ok());
+ // Check the source key no longer has downstream rules
+ EXPECT_TRUE(ts_db_->Info(*ctx_, src_key, &info).ok());
+ EXPECT_EQ(info.downstream_rules.size(), 0);
+}
+
TEST_F(TimeSeriesTest, AggregationOnEmptySeries) {
redis::TSCreateOption option;
option.chunk_size = 3;