This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 9d656f14 Add the ApplyBatch command for data migration scenario (#2010)
9d656f14 is described below

commit 9d656f1476583651e2fae20ed1c37cf35d0679ca
Author: Myth <[email protected]>
AuthorDate: Sun Jan 14 15:59:07 2024 +0800

    Add the ApplyBatch command for data migration scenario (#2010)
    
    Co-authored-by: git-hulk <[email protected]>
---
 src/commands/cmd_server.cc                      | 36 ++++++++++++++-
 src/storage/storage.cc                          |  8 ++--
 src/storage/storage.h                           |  1 +
 tests/gocase/unit/applybatch/applybatch_test.go | 58 +++++++++++++++++++++++++
 4 files changed, 99 insertions(+), 4 deletions(-)

diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index d921c766..e0700af8 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -1255,6 +1255,39 @@ class CommandReset : public Commander {
   }
 };
 
+class CommandApplyBatch : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    raw_batch_ = args[1];
+    if (args.size() > 2) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+      if (!util::EqualICase(args[2], "lowpri")) {
+        return {Status::RedisParseErr, "only support LOWPRI option"};
+      }
+      low_pri_ = true;
+    }
+    return Commander::Parse(args);
+  }
+
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    size_t size = raw_batch_.size();
+    auto options = svr->storage->DefaultWriteOptions();
+    options.low_pri = low_pri_;
+    auto s = svr->storage->ApplyWriteBatch(options, std::move(raw_batch_));
+    if (!s.IsOK()) {
+      return {Status::RedisExecErr, s.Msg()};
+    }
+    *output = redis::Integer(size);
+    return Status::OK();
+  }
+
+ private:
+  std::string raw_batch_;
+  bool low_pri_ = false;
+};
+
 REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth", 2, "read-only 
ok-loading", 0, 0, 0),
                         MakeCmdAttr<CommandPing>("ping", -1, "read-only", 0, 
0, 0),
                         MakeCmdAttr<CommandSelect>("select", 2, "read-only", 
0, 0, 0),
@@ -1291,5 +1324,6 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth", 
2, "read-only ok-loadin
                         MakeCmdAttr<CommandStats>("stats", 1, "read-only", 0, 
0, 0),
                         MakeCmdAttr<CommandRdb>("rdb", -3, "write exclusive", 
0, 0, 0),
                         MakeCmdAttr<CommandAnalyze>("analyze", -1, "", 0, 0, 
0),
-                        MakeCmdAttr<CommandReset>("reset", -1, "multi 
pub-sub", 0, 0, 0), )
+                        MakeCmdAttr<CommandReset>("reset", -1, "multi 
pub-sub", 0, 0, 0),
+                        MakeCmdAttr<CommandApplyBatch>("applybatch", -2, 
"write no-multi", 0, 0, 0), )
 }  // namespace redis
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 08e79076..a600813f 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -661,16 +661,18 @@ rocksdb::Status Storage::FlushScripts(const 
rocksdb::WriteOptions &options, rock
 }
 
 Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
+  return ApplyWriteBatch(write_opts_, std::move(raw_batch));
+}
+
+Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, 
std::string &&raw_batch) {
   if (db_size_limit_reached_) {
     return {Status::NotOK, "reach space limit"};
   }
-
   auto batch = rocksdb::WriteBatch(std::move(raw_batch));
-  auto s = db_->Write(write_opts_, &batch);
+  auto s = db_->Write(options, &batch);
   if (!s.ok()) {
     return {Status::NotOK, s.ToString()};
   }
-
   return Status::OK();
 }
 
diff --git a/src/storage/storage.h b/src/storage/storage.h
index f0134cbf..7e37d82f 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -132,6 +132,7 @@ class Storage {
   Status RestoreFromCheckpoint();
   Status GetWALIter(rocksdb::SequenceNumber seq, 
std::unique_ptr<rocksdb::TransactionLogIterator> *iter);
   Status ReplicaApplyWriteBatch(std::string &&raw_batch);
+  Status ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string 
&&raw_batch);
   rocksdb::SequenceNumber LatestSeqNumber();
 
   [[nodiscard]] rocksdb::Status Get(const rocksdb::ReadOptions &options, const 
rocksdb::Slice &key, std::string *value);
diff --git a/tests/gocase/unit/applybatch/applybatch_test.go 
b/tests/gocase/unit/applybatch/applybatch_test.go
new file mode 100644
index 00000000..275b8663
--- /dev/null
+++ b/tests/gocase/unit/applybatch/applybatch_test.go
@@ -0,0 +1,58 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+ */
+
+package applybatch
+
+import (
+       "context"
+       "encoding/hex"
+       "testing"
+
+       "github.com/apache/kvrocks/tests/gocase/util"
+       "github.com/stretchr/testify/require"
+)
+
+func TestApplyBatch_Basic(t *testing.T) {
+       srv := util.StartServer(t, map[string]string{})
+       defer srv.Close()
+
+       ctx := context.Background()
+       rdb := srv.NewClient()
+       defer func() { require.NoError(t, rdb.Close()) }()
+
+       t.Run("Make sure the apply batch command works", func(t *testing.T) {
+               // SET a 1
+               batch, err := 
hex.DecodeString("04000000000000000100000003013105010D0B5F5F6E616D6573706163656106010000000031")
+               require.NoError(t, err)
+               r := rdb.Do(ctx, "ApplyBatch", string(batch))
+               val, err := r.Int64()
+               require.NoError(t, err)
+               require.EqualValues(t, len(batch), val)
+               require.Equal(t, "1", rdb.Get(ctx, "a").Val())
+
+               // HSET hash field value
+               batch, err = 
hex.DecodeString("05000000000000000200000003013201210B5F5F6E616D65737061636500000004686173683076F331696342A76669656C640576616C75650501100B5F5F6E616D657370616365686173681102000000003076F331696342A700000002")
+               require.NoError(t, err)
+               r = rdb.Do(ctx, "ApplyBatch", string(batch))
+               val, err = r.Int64()
+               require.NoError(t, err)
+               require.EqualValues(t, len(batch), val)
+               require.Equal(t, "value", rdb.HGet(ctx, "hash", "field").Val())
+       })
+}

Reply via email to