This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 589b01899 feat(ts): add the support of TS.QUERYINDEX command (#3246)
589b01899 is described below
commit 589b018991de45f52c28b2107f8ff72fffed2766
Author: Kenny Lau <[email protected]>
AuthorDate: Wed Nov 5 01:44:38 2025 -0500
feat(ts): add the support of TS.QUERYINDEX command (#3246)
It closes #3213.
Co-authored-by: RX Xiao <[email protected]>
---
src/commands/cmd_timeseries.cc | 42 +++++++++++++++++++++-
src/types/redis_timeseries.cc | 5 +++
src/types/redis_timeseries.h | 2 ++
.../gocase/unit/type/timeseries/timeseries_test.go | 16 +++++++++
4 files changed, 64 insertions(+), 1 deletion(-)
diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index 8797dd89e..7b5024810 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -1187,6 +1187,45 @@ class CommandTSDel : public Commander {
uint64_t end_ts_ = TSSample::MAX_TIMESTAMP;
};
+class CommandTSQueryIndex : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ if (args.size() < 2) {
+ return {Status::RedisParseErr, "wrong number of arguments for
'ts.queryindex' command"};
+ }
+ CommandParser parser(args, 1);
+ // Parse filterExpr
+ auto filter_parser = TSMQueryFilterParser(filter_option_);
+ while (parser.Good()) {
+ auto s = filter_parser.Parse(parser.TakeStr().GetValue());
+ if (!s.IsOK()) return s;
+ }
+ return filter_parser.Check();
+ }
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ if (srv->GetConfig()->cluster_enabled) {
+ return {Status::RedisExecErr, "TS.QueryIndex is not supported in cluster
mode"};
+ }
+ auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+ std::vector<std::string> results;
+ auto s = timeseries_db.QueryIndex(ctx, getQueryIndexOption(), &results);
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+ std::vector<std::string> reply;
+ reply.reserve(results.size());
+ for (auto &result : results) {
+ reply.push_back(redis::BulkString(result));
+ }
+ *output = redis::Array(reply);
+ return Status::OK();
+ }
+
+ protected:
+ const TSMGetOption::FilterOption &getQueryIndexOption() const { return
filter_option_; }
+
+ private:
+ TSMGetOption::FilterOption filter_option_;
+};
+
REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create",
-2, "write", 1, 1, 1),
MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1,
1),
MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1,
-3, 1),
@@ -1200,6 +1239,7 @@ REDIS_REGISTER_COMMANDS(Timeseries,
MakeCmdAttr<CommandTSCreate>("ts.create", -2
MakeCmdAttr<CommandTSMRevRange>("ts.mrevrange", -5,
"read-only", NO_KEY),
MakeCmdAttr<CommandTSIncrByDecrBy>("ts.incrby", -3,
"write", 1, 1, 1),
MakeCmdAttr<CommandTSIncrByDecrBy>("ts.decrby", -3,
"write", 1, 1, 1),
- MakeCmdAttr<CommandTSDel>("ts.del", -4, "write", 1, 1,
1), );
+ MakeCmdAttr<CommandTSDel>("ts.del", -4, "write", 1, 1,
1),
+ MakeCmdAttr<CommandTSQueryIndex>("ts.queryindex", -2,
"read-only", NO_KEY), );
} // namespace redis
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 54ea1933d..cb7dde33f 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -2295,4 +2295,9 @@ rocksdb::Status TimeSeries::IsTSSubKeyExpired(const
TimeSeriesMetadata &metadata
return rocksdb::Status::OK();
}
+rocksdb::Status TimeSeries::QueryIndex(engine::Context &ctx, const
TSMGetOption::FilterOption &filter_option,
+ std::vector<std::string> *res) {
+ return getTSKeyByFilter(ctx, filter_option, res);
+}
+
} // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 460cbe2b5..92a104657 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -288,6 +288,8 @@ class TimeSeries : public SubKeyScanner {
rocksdb::Status Del(engine::Context &ctx, const Slice &user_key, uint64_t
from, uint64_t to, uint64_t *deleted);
rocksdb::Status IsTSSubKeyExpired(const TimeSeriesMetadata &metadata, const
Slice &key, const Slice &value,
bool &expired);
+ rocksdb::Status QueryIndex(engine::Context &ctx, const
TSMGetOption::FilterOption &filter_option,
+ std::vector<std::string> *res);
static bool ExtractTSSubType(const InternalKey &ikey, TSSubkeyType *type);
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go
b/tests/gocase/unit/type/timeseries/timeseries_test.go
index ffda89e6f..d1ddf9405 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -1003,4 +1003,20 @@ func testTimeSeries(t *testing.T, configs
util.KvrocksServerConfigs) {
_, err = rdb.Do(ctx, "ts.del", srcKey, "-", "+").Result()
require.ErrorContains(t, err, "When a series has compactions,
deleting samples or compaction buckets beyond the series retention period is
not possible")
})
+
+ t.Run("TS.QUERYINDEX", func(t *testing.T) {
+ // Create test based on example in Redis documentation
+ require.NoError(t, rdb.Do(ctx, "ts.create",
"telemetry:study:temperature", "LABELS", "room", "study", "type",
"temperature").Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create",
"telemetry:study:humidity", "LABELS", "room", "study", "type",
"humidity").Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create",
"telemetry:kitchen:temperature", "LABELS", "room", "kitchen", "type",
"temperature").Err())
+ require.NoError(t, rdb.Do(ctx, "ts.create",
"telemetry:kitchen:humidity", "LABELS", "room", "kitchen", "type",
"humidity").Err())
+
+ res, err := rdb.Do(ctx, "ts.queryindex",
"room=kitchen").Result()
+ require.NoError(t, err)
+ assert.Equal(t, []interface{}{"telemetry:kitchen:humidity",
"telemetry:kitchen:temperature"}, res)
+
+ res, err = rdb.Do(ctx, "ts.queryindex",
"type=temperature").Result()
+ require.NoError(t, err)
+ assert.Equal(t, []interface{}{"telemetry:kitchen:temperature",
"telemetry:study:temperature"}, res)
+ })
}