This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new b88ff7c1b feat(ts): add the support of TS.MREVRANGE command (#3245)
b88ff7c1b is described below
commit b88ff7c1b58006a41fdeac96295efb4140f38912
Author: Raja Nandhan <[email protected]>
AuthorDate: Mon Nov 3 01:16:09 2025 -0600
feat(ts): add the support of TS.MREVRANGE command (#3245)
It closes #3212.
---------
Co-authored-by: var-nan <[email protected]>
---
src/commands/cmd_timeseries.cc | 19 +++++++++--
src/types/redis_timeseries.cc | 9 ++++++
src/types/redis_timeseries.h | 1 +
.../gocase/unit/type/timeseries/timeseries_test.go | 37 ++++++++++++++++++++++
4 files changed, 64 insertions(+), 2 deletions(-)
diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index 0602327f6..8797dd89e 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -983,7 +983,7 @@ class CommandTSMRange : public CommandTSRangeBase, public
CommandTSMGetBase {
}
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
std::vector<TSMRangeResult> results;
- auto s = timeseries_db.MRange(ctx, option_, &results);
+ auto s = executeCommand(ctx, timeseries_db, &results);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
std::vector<std::string> reply;
@@ -1058,8 +1058,22 @@ class CommandTSMRange : public CommandTSRangeBase,
public CommandTSMGetBase {
return Status::OK();
}
- private:
TSMRangeOption option_;
+
+ private:
+ virtual rocksdb::Status executeCommand(engine::Context &ctx, TimeSeries &ts,
std::vector<TSMRangeResult> *results) {
+ return ts.MRange(ctx, option_, results);
+ }
+};
+
+class CommandTSMRevRange : public CommandTSMRange {
+ public:
+ CommandTSMRevRange() = default;
+
+ private:
+ rocksdb::Status executeCommand(engine::Context &ctx, TimeSeries &ts,
std::vector<TSMRangeResult> *results) override {
+ return ts.MRevRange(ctx, option_, results);
+ }
};
class CommandTSIncrByDecrBy : public CommandTSCreateBase {
@@ -1183,6 +1197,7 @@ REDIS_REGISTER_COMMANDS(Timeseries,
MakeCmdAttr<CommandTSCreate>("ts.create", -2
MakeCmdAttr<CommandTSCreateRule>("ts.createrule", -6,
"write", 1, 2, 1),
MakeCmdAttr<CommandTSMGet>("ts.mget", -3, "read-only",
NO_KEY),
MakeCmdAttr<CommandTSMRange>("ts.mrange", -5,
"read-only", NO_KEY),
+ MakeCmdAttr<CommandTSMRevRange>("ts.mrevrange", -5,
"read-only", NO_KEY),
MakeCmdAttr<CommandTSIncrByDecrBy>("ts.incrby", -3,
"write", 1, 1, 1),
MakeCmdAttr<CommandTSIncrByDecrBy>("ts.decrby", -3,
"write", 1, 1, 1),
MakeCmdAttr<CommandTSDel>("ts.del", -4, "write", 1, 1,
1), );
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index c5faae361..54ea1933d 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -2180,6 +2180,15 @@ rocksdb::Status TimeSeries::MRange(engine::Context &ctx,
const TSMRangeOption &o
return rocksdb::Status::OK();
}
+rocksdb::Status TimeSeries::MRevRange(engine::Context &ctx, const
TSMRangeOption &option,
+ std::vector<TSMRangeResult> *res) {
+ auto s = MRange(ctx, option, res);
+ if (res) {
+ for (auto &row : *res) std::reverse(row.samples.begin(),
row.samples.end());
+ }
+ return s;
+}
+
rocksdb::Status TimeSeries::IncrBy(engine::Context &ctx, const Slice
&user_key, TSSample sample,
const TSCreateOption &option, AddResult
*res) {
std::string ns_key = AppendNamespacePrefix(user_key);
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 45a99bb0b..460cbe2b5 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -282,6 +282,7 @@ class TimeSeries : public SubKeyScanner {
rocksdb::Status MGet(engine::Context &ctx, const TSMGetOption &option, bool
is_return_latest,
std::vector<TSMGetResult> *res);
rocksdb::Status MRange(engine::Context &ctx, const TSMRangeOption &option,
std::vector<TSMRangeResult> *res);
+ rocksdb::Status MRevRange(engine::Context &ctx, const TSMRangeOption
&option, std::vector<TSMRangeResult> *res);
rocksdb::Status IncrBy(engine::Context &ctx, const Slice &user_key, TSSample
sample, const TSCreateOption &option,
AddResult *res);
rocksdb::Status Del(engine::Context &ctx, const Slice &user_key, uint64_t
from, uint64_t to, uint64_t *deleted);
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go
b/tests/gocase/unit/type/timeseries/timeseries_test.go
index ee116e536..ffda89e6f 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -824,6 +824,24 @@ func testTimeSeries(t *testing.T, configs
util.KvrocksServerConfigs) {
for i, s := range samples {
require.Equal(t, expectSamples[i],
s.([]interface{}))
}
+
+ // Test MREVRANGE
+ res2 := rdb.Do(ctx, "ts.mrevrange", "-", "+",
"WITHLABELS", "FILTER", "type="+type_label, "GROUPBY", "type", "REDUCE",
"max").Val().([]interface{})
+ require.Equal(t, 1, len(res))
+
+ group2 := res2[0].([]interface{})
+ require.Equal(t, "type=stock_MRange", group2[0])
+
+ metadata2 := group2[1].([]interface{})
+ labels2 := metadata2[0].([]interface{})
+ require.Equal(t, []interface{}{"type", type_label},
labels2)
+ require.Equal(t, "max", metadata2[1].([]interface{})[1])
+
+ samples2 := group2[2].([]interface{})
+ require.Equal(t, 3, len(samples2))
+ for i, s := range samples2 {
+ require.Equal(t,
expectSamples[len(expectSamples)-1-i], s.([]interface{}))
+ }
})
t.Run("With Aggregation", func(t *testing.T) {
@@ -859,6 +877,25 @@ func testTimeSeries(t *testing.T, configs
util.KvrocksServerConfigs) {
for i, s := range samples {
require.Equal(t, expectSamples[i],
s.([]interface{}))
}
+
+ // Test MREVRANGE
+ res2 := rdb.Do(ctx, "ts.mrevrange", "-", "+",
"WITHLABELS", "AGGREGATION", "avg", "1000", "FILTER", "type="+type_label,
"GROUPBY", "type", "REDUCE", "max").Val().([]interface{})
+ require.Equal(t, 1, len(res2))
+
+ name2 := res2[0].([]interface{})[0].(string)
+ require.Equal(t, name, name2)
+ labels2 := res2[0].([]interface{})[1].([]interface{})
+ require.Equal(t, 3, len(labels))
+ require.Equal(t, labels[0].([]interface{}),
labels2[0].([]interface{}))
+ require.Equal(t, labels[1].([]interface{}),
labels2[1].([]interface{}))
+ require.Equal(t, labels[2].([]interface{}),
labels2[2].([]interface{}))
+
+ samples2 := res2[0].([]interface{})[2].([]interface{})
+ require.Equal(t, 3, len(samples))
+ for i, s := range samples2 {
+ require.Equal(t,
expectSamples[len(expectSamples)-1-i], s.([]interface{}))
+ }
+
})
t.Run("Filter By Value", func(t *testing.T) {