This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 2dc7e9ea6 feat(ts): add the support of TS.ALTER command (#3264)
2dc7e9ea6 is described below

commit 2dc7e9ea623b4009e4522b02f4b3ff9ba22618c8
Author: Raja Nandhan <[email protected]>
AuthorDate: Fri Nov 21 02:04:00 2025 -0600

    feat(ts): add the support of TS.ALTER command (#3264)
    
    It closes #3214.
    
    ---------
    
    Co-authored-by: var-nan <[email protected]>
    Co-authored-by: DeEMO <[email protected]>
---
 src/commands/cmd_timeseries.cc                     | 48 +++++++++++++-
 src/types/redis_timeseries.cc                      | 62 ++++++++++++++++++
 src/types/redis_timeseries.h                       |  9 +++
 tests/cppunit/types/timeseries_test.cc             | 76 ++++++++++++++++++++++
 .../gocase/unit/type/timeseries/timeseries_test.go | 10 +++
 5 files changed, 204 insertions(+), 1 deletion(-)

diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index 7b5024810..39e5becde 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -315,7 +315,6 @@ class CommandTSCreateBase : public KeywordCommandBase {
     return Status::OK();
   }
 
- private:
   TSCreateOption create_option_;
 };
 
@@ -342,6 +341,52 @@ class CommandTSCreate : public CommandTSCreateBase {
   }
 };
 
+class CommandTSAlter : public CommandTSCreateBase {
+ public:
+  CommandTSAlter() { registerDefaultHandlers(); }
+  Status Parse(const std::vector<std::string> &args) override {
+    if (args.size() < 2) {
+      return {Status::RedisParseErr, errWrongNumOfArguments};
+    }
+    CommandTSCreateBase::setSkipNum(2);
+    return CommandTSCreateBase::Parse(args);
+  }
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
+    auto sc = CommandTSCreateBase::Execute(ctx, srv, conn, output);
+    if (!sc.IsOK()) return sc;
+
+    auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+    auto s = timeseries_db.Alter(ctx, args_[1], getCreateOption(), mask_);
+    if (!s.ok() && s.IsInvalidArgument()) return {Status::RedisExecErr, 
errKeyNotFound};
+    if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+    *output = redis::RESP_OK;
+    return Status::OK();
+  }
+
+ private:
+  uint8_t mask_ = 0;
+
+  void registerDefaultHandlers() override {
+    using AlterMode = std::underlying_type<TSAlterMode>::type;
+    registerHandler("RETENTION", [this](TSOptionsParser &parser) {
+      mask_ |= static_cast<AlterMode>(TSAlterMode::RETENTION);
+      return handleRetention(parser, create_option_.retention_time);
+    });
+    registerHandler("CHUNK_SIZE", [this](TSOptionsParser &parser) {
+      mask_ |= static_cast<AlterMode>(TSAlterMode::CHUNK_SIZE);
+      return handleChunkSize(parser, create_option_.chunk_size);
+    });
+    registerHandler("DUPLICATE_POLICY", [this](TSOptionsParser &parser) {
+      mask_ |= static_cast<AlterMode>(TSAlterMode::DUPLICATE_POLICY);
+      return handleDuplicatePolicy(parser, create_option_.duplicate_policy);
+    });
+    registerHandler("LABELS", [this](TSOptionsParser &parser) {
+      mask_ |= static_cast<AlterMode>(TSAlterMode::LABELS);
+      return handleLabels(parser, create_option_.labels);
+    });
+  }
+};
+
 class CommandTSInfo : public Commander {
  public:
   Status Parse(const std::vector<std::string> &args) override { return 
Commander::Parse(args); }
@@ -1227,6 +1272,7 @@ class CommandTSQueryIndex : public Commander {
 };
 
 REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create", 
-2, "write", 1, 1, 1),
+                        MakeCmdAttr<CommandTSAlter>("ts.alter", -2, "write", 
1, 1, 1),
                         MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1, 
1),
                         MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1, 
-3, 1),
                         MakeCmdAttr<CommandTSRange>("ts.range", -4, 
"read-only", 1, 1, 1),
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index cb7dde33f..fa14fe039 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -1860,6 +1860,68 @@ rocksdb::Status TimeSeries::Create(engine::Context &ctx, 
const Slice &user_key,
   return createTimeSeries(ctx, ns_key, &metadata, &option);
 }
 
+rocksdb::Status TimeSeries::Alter(engine::Context &ctx, const Slice &user_key, 
const TSCreateOption &option,
+                                  uint8_t mask) {
+  std::string ns_key = AppendNamespacePrefix(user_key);
+  TimeSeriesMetadata metadata;
+  if (auto s = getTimeSeriesMetadata(ctx, ns_key, &metadata); !s.ok()) {
+    return rocksdb::Status::InvalidArgument("key not exists");
+  }
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisTimeSeries, {"Alter"});
+  if (auto s = batch->PutLogData(log_data.Encode()); !s.ok()) return s;
+
+  using AlterMode = std::underlying_type<TSAlterMode>::type;
+  bool update_retention = mask & 
static_cast<AlterMode>(TSAlterMode::RETENTION);
+  bool update_chunk_size = mask & 
static_cast<AlterMode>(TSAlterMode::CHUNK_SIZE);
+  bool update_duplicate_policy = mask & 
static_cast<AlterMode>(TSAlterMode::DUPLICATE_POLICY);
+  bool update_labels = mask & static_cast<AlterMode>(TSAlterMode::LABELS);
+
+  if (update_retention || update_chunk_size || update_duplicate_policy) {
+    metadata.retention_time = update_retention ? option.retention_time : 
metadata.retention_time;
+    metadata.chunk_size = update_chunk_size ? option.chunk_size : 
metadata.chunk_size;
+    metadata.duplicate_policy = update_duplicate_policy ? 
option.duplicate_policy : metadata.duplicate_policy;
+    std::string bytes;
+    metadata.Encode(&bytes);
+    if (auto s = batch->Put(metadata_cf_handle_, ns_key, bytes); !s.ok()) 
return s;
+  }
+  if (update_labels) {
+    LabelKVList prev_labels;
+    if (auto s = getLabelKVList(ctx, ns_key, metadata, &prev_labels); !s.ok()) 
return s;
+
+    std::unordered_map<std::string, std::pair<std::string, bool>> labels_map;  
// True : (key,val) should be updated
+    for (auto &label : option.labels) {
+      labels_map.insert(std::make_pair(label.k, std::make_pair(label.v, 
true)));
+    }
+    for (auto &label : prev_labels) {
+      if (auto it = labels_map.find(label.k); it != labels_map.end()) {
+        it->second.second = it->second.first != label.v;
+        if (it->second.second) {  // Remove reverse index key. Val is updated 
later.
+          auto rev_index_key = TSRevLabelKey(namespace_, label.k, label.v, 
user_key).Encode();
+          if (auto s = batch->Delete(index_cf_handle_, rev_index_key); 
!s.ok()) return s;
+        }  // Else key-val pair unchanged
+      } else {  // Remove label.
+        auto internal_key = internalKeyFromLabelKey(ns_key, metadata, label.k);
+        if (auto s = batch->Delete(internal_key); !s.ok()) return s;
+
+        auto rev_index_key = TSRevLabelKey(namespace_, label.k, label.v, 
user_key).Encode();
+        if (auto s = batch->Delete(index_cf_handle_, rev_index_key); !s.ok()) 
return s;
+      }
+    }
+
+    for (auto &[k, v] : labels_map) {
+      if (v.second) {  // Update label and insert reverse-index
+        auto internal_key = internalKeyFromLabelKey(ns_key, metadata, k);
+        if (auto s = batch->Put(internal_key, v.first); !s.ok()) return s;
+
+        auto rev_index_key = TSRevLabelKey(namespace_, k, v.first, 
user_key).Encode();
+        if (auto s = batch->Put(index_cf_handle_, rev_index_key, Slice()); 
!s.ok()) return s;
+      }
+    }
+  }
+  return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
 rocksdb::Status TimeSeries::Add(engine::Context &ctx, const Slice &user_key, 
TSSample sample,
                                 const TSCreateOption &option, AddResult *res, 
const DuplicatePolicy *on_dup_policy) {
   std::string ns_key = AppendNamespacePrefix(user_key);
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 92a104657..768b8a48f 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -253,6 +253,14 @@ enum class TSCreateRuleResult : uint8_t {
   kSrcEqDst = 6,
 };
 
+enum class TSAlterMode : uint8_t {
+  RETENTION = 1,
+  CHUNK_SIZE = 1 << 1,
+  DUPLICATE_POLICY = 1 << 2,
+  IGNORE = 1 << 3,
+  LABELS = 1 << 4,
+};
+
 std::vector<TSSample> GroupSamplesAndReduce(const 
std::vector<std::vector<TSSample>> &all_samples,
                                             TSMRangeOption::GroupReducerType 
reducer_type);
 
@@ -267,6 +275,7 @@ class TimeSeries : public SubKeyScanner {
   TimeSeries(engine::Storage *storage, const std::string &ns)
       : SubKeyScanner(storage, ns), 
index_cf_handle_(storage->GetCFHandle(ColumnFamilyID::Index)) {}
   rocksdb::Status Create(engine::Context &ctx, const Slice &user_key, const 
TSCreateOption &option);
+  rocksdb::Status Alter(engine::Context &ctx, const Slice &user_key, const 
TSCreateOption &option, uint8_t mask);
   rocksdb::Status Add(engine::Context &ctx, const Slice &user_key, TSSample 
sample, const TSCreateOption &option,
                       AddResult *res, const DuplicatePolicy *on_dup_policy = 
nullptr);
   rocksdb::Status MAdd(engine::Context &ctx, const Slice &user_key, 
std::vector<TSSample> samples,
diff --git a/tests/cppunit/types/timeseries_test.cc 
b/tests/cppunit/types/timeseries_test.cc
index a268e6706..5937a9288 100644
--- a/tests/cppunit/types/timeseries_test.cc
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -53,6 +53,82 @@ TEST_F(TimeSeriesTest, Create) {
   EXPECT_EQ(s.ToString(), "Invalid argument: key already exists");
 }
 
+TEST_F(TimeSeriesTest, Alter) {
+  redis::TSCreateOption option;
+  option.retention_time = 3600;
+  option.chunk_size = 1024;
+  option.labels = {{"type", "runtime"}, {"compiler", "gcc"}, {"machine", 
"Linux"}};
+  key_ = "pa";
+
+  EXPECT_TRUE(ts_db_->Create(*ctx_, key_, option).ok());
+
+  redis::TSInfoResult res;
+  option.retention_time = 200;
+  EXPECT_TRUE(ts_db_->Alter(*ctx_, key_, option, 
static_cast<uint8_t>(redis::TSAlterMode::RETENTION)).ok());
+  EXPECT_TRUE(ts_db_->Info(*ctx_, key_, &res).ok());
+  EXPECT_EQ(res.metadata.retention_time, 200);
+  EXPECT_EQ(res.metadata.chunk_size, 1024);
+  EXPECT_EQ(res.labels.size(), 3);
+
+  // Update chunk size and verify other fields are unaffected.
+  option.chunk_size = 128;
+  EXPECT_TRUE(ts_db_->Alter(*ctx_, key_, option, 
static_cast<uint8_t>(redis::TSAlterMode::CHUNK_SIZE)).ok());
+  ts_db_->Info(*ctx_, key_, &res);
+  EXPECT_EQ(res.metadata.retention_time, 200);
+  EXPECT_EQ(res.metadata.chunk_size, 128);
+  EXPECT_EQ(res.labels.size(), 3);
+
+  // Verify records are properly inserted.
+  std::vector<TSSample> samples = {{10, 23}, {12, 24.5}};
+  std::vector<TSChunk::AddResult> results;
+  results.resize(samples.size());
+  EXPECT_TRUE(ts_db_->MAdd(*ctx_, key_, samples, &results).ok());
+  EXPECT_EQ(results[0].sample.ts, 10);
+  EXPECT_EQ(results[1].sample.ts, 12);
+
+  // Update labels and verify
+  res.labels.clear();
+  option.labels = {{"compiler", "gcc_12"}, {"version", "123"}};
+  EXPECT_TRUE(ts_db_->Alter(*ctx_, key_, option, 
static_cast<uint8_t>(redis::TSAlterMode::LABELS)).ok());
+  EXPECT_TRUE(ts_db_->Info(*ctx_, key_, &res).ok());
+  EXPECT_EQ(res.metadata.retention_time, 200);
+  EXPECT_EQ(res.metadata.chunk_size, 128);
+  EXPECT_EQ(res.labels.size(), 2);
+  redis::LabelKVPair first = {"version", "123"}, second = {"compiler", 
"gcc_12"};
+  EXPECT_TRUE(std::find_if(res.labels.begin(), res.labels.end(), [first](const 
auto &label) {
+                return first.k == label.k && first.v == label.v;
+              }) != res.labels.end());
+  EXPECT_TRUE(std::find_if(res.labels.begin(), res.labels.end(), 
[second](const auto &label) {
+                return second.k == label.k && second.v == label.v;
+              }) != res.labels.end());
+
+  // Verify reverse-indexes are properly updated.
+  key_ = "pavani";
+  redis::TSCreateOption second_option;
+  second_option.labels = {{"compiler", "gcc_12"}, {"Desktop", "Lubuntu"}};
+  EXPECT_TRUE(ts_db_->Create(*ctx_, key_, second_option).ok());
+
+  redis::TSMGetOption::FilterOption filters;
+  filters.labels_equals = {{"compiler", {"gcc_12"}}};
+  std::vector<std::string> query_res;
+  EXPECT_TRUE(ts_db_->QueryIndex(*ctx_, filters, &query_res).ok());
+  EXPECT_EQ(query_res.size(), 2);
+  EXPECT_TRUE(std::find(query_res.begin(), query_res.end(), "pa") != 
query_res.end());
+  EXPECT_TRUE(std::find(query_res.begin(), query_res.end(), "pavani") != 
query_res.end());
+
+  // old labels should be deleted.
+  filters.labels_equals.clear();
+  filters.labels_equals = {{"machine", {"Linux"}}};
+  query_res.clear();
+  EXPECT_TRUE(ts_db_->QueryIndex(*ctx_, filters, &query_res).ok());
+  EXPECT_TRUE(query_res.empty());
+
+  key_ = "pavni";
+  auto s = ts_db_->Alter(*ctx_, key_, option, 1);
+  EXPECT_FALSE(s.ok());
+  EXPECT_EQ(s.ToString(), "Invalid argument: key not exists");
+}
+
 TEST_F(TimeSeriesTest, Add) {
   redis::TSCreateOption option;
   option.retention_time = 3600;
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go 
b/tests/gocase/unit/type/timeseries/timeseries_test.go
index d1ddf9405..b0d9a1936 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -167,6 +167,16 @@ func testTimeSeries(t *testing.T, configs 
util.KvrocksServerConfigs) {
                require.Equal(t, int64(2), vals[11])
        })
 
+       t.Run("TS.ALTER Verify Updates", func(t *testing.T) {
+               require.NoError(t, rdb.Do(ctx, "ts.create", key, "retention", 
"3600", "chunk_size", "1024",
+                       "LABELS", "type", "runtime", "compiler", "gcc", 
"machine", "gcc").Err())
+
+               require.NoError(t, rdb.Do(ctx, "ts.alter", key, "retention", 
"200").Err())
+               vals, err := rdb.Do(ctx, "ts.info", key).Slice()
+               require.NoError(t, err)
+               require.Equal(t, int64(200), vals[9])
+       })
+
        t.Run("TS.ADD Basic Add", func(t *testing.T) {
                require.NoError(t, rdb.Del(ctx, key).Err())
                require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())

Reply via email to