This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 9d656f14 Add the ApplyBatch command for data migration scenario (#2010)
9d656f14 is described below
commit 9d656f1476583651e2fae20ed1c37cf35d0679ca
Author: Myth <[email protected]>
AuthorDate: Sun Jan 14 15:59:07 2024 +0800
Add the ApplyBatch command for data migration scenario (#2010)
Co-authored-by: git-hulk <[email protected]>
---
src/commands/cmd_server.cc | 36 ++++++++++++++-
src/storage/storage.cc | 8 ++--
src/storage/storage.h | 1 +
tests/gocase/unit/applybatch/applybatch_test.go | 58 +++++++++++++++++++++++++
4 files changed, 99 insertions(+), 4 deletions(-)
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index d921c766..e0700af8 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -1255,6 +1255,39 @@ class CommandReset : public Commander {
}
};
+class CommandApplyBatch : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ raw_batch_ = args[1];
+ if (args.size() > 2) {
+ if (args.size() > 3) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+ if (!util::EqualICase(args[2], "lowpri")) {
+ return {Status::RedisParseErr, "only support LOWPRI option"};
+ }
+ low_pri_ = true;
+ }
+ return Commander::Parse(args);
+ }
+
+ Status Execute(Server *svr, Connection *conn, std::string *output) override {
+ size_t size = raw_batch_.size();
+ auto options = svr->storage->DefaultWriteOptions();
+ options.low_pri = low_pri_;
+ auto s = svr->storage->ApplyWriteBatch(options, std::move(raw_batch_));
+ if (!s.IsOK()) {
+ return {Status::RedisExecErr, s.Msg()};
+ }
+ *output = redis::Integer(size);
+ return Status::OK();
+ }
+
+ private:
+ std::string raw_batch_;
+ bool low_pri_ = false;
+};
+
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth", 2, "read-only
ok-loading", 0, 0, 0),
MakeCmdAttr<CommandPing>("ping", -1, "read-only", 0,
0, 0),
MakeCmdAttr<CommandSelect>("select", 2, "read-only",
0, 0, 0),
@@ -1291,5 +1324,6 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth",
2, "read-only ok-loadin
MakeCmdAttr<CommandStats>("stats", 1, "read-only", 0,
0, 0),
MakeCmdAttr<CommandRdb>("rdb", -3, "write exclusive",
0, 0, 0),
MakeCmdAttr<CommandAnalyze>("analyze", -1, "", 0, 0,
0),
- MakeCmdAttr<CommandReset>("reset", -1, "multi
pub-sub", 0, 0, 0), )
+ MakeCmdAttr<CommandReset>("reset", -1, "multi
pub-sub", 0, 0, 0),
+ MakeCmdAttr<CommandApplyBatch>("applybatch", -2,
"write no-multi", 0, 0, 0), )
} // namespace redis
diff --git a/src/storage/storage.cc b/src/storage/storage.cc
index 08e79076..a600813f 100644
--- a/src/storage/storage.cc
+++ b/src/storage/storage.cc
@@ -661,16 +661,18 @@ rocksdb::Status Storage::FlushScripts(const
rocksdb::WriteOptions &options, rock
}
Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
+ return ApplyWriteBatch(write_opts_, std::move(raw_batch));
+}
+
+Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options,
std::string &&raw_batch) {
if (db_size_limit_reached_) {
return {Status::NotOK, "reach space limit"};
}
-
auto batch = rocksdb::WriteBatch(std::move(raw_batch));
- auto s = db_->Write(write_opts_, &batch);
+ auto s = db_->Write(options, &batch);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
-
return Status::OK();
}
diff --git a/src/storage/storage.h b/src/storage/storage.h
index f0134cbf..7e37d82f 100644
--- a/src/storage/storage.h
+++ b/src/storage/storage.h
@@ -132,6 +132,7 @@ class Storage {
Status RestoreFromCheckpoint();
Status GetWALIter(rocksdb::SequenceNumber seq,
std::unique_ptr<rocksdb::TransactionLogIterator> *iter);
Status ReplicaApplyWriteBatch(std::string &&raw_batch);
+ Status ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string
&&raw_batch);
rocksdb::SequenceNumber LatestSeqNumber();
[[nodiscard]] rocksdb::Status Get(const rocksdb::ReadOptions &options, const
rocksdb::Slice &key, std::string *value);
diff --git a/tests/gocase/unit/applybatch/applybatch_test.go
b/tests/gocase/unit/applybatch/applybatch_test.go
new file mode 100644
index 00000000..275b8663
--- /dev/null
+++ b/tests/gocase/unit/applybatch/applybatch_test.go
@@ -0,0 +1,58 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+ */
+
+package applybatch
+
+import (
+ "context"
+ "encoding/hex"
+ "testing"
+
+ "github.com/apache/kvrocks/tests/gocase/util"
+ "github.com/stretchr/testify/require"
+)
+
+func TestApplyBatch_Basic(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{})
+ defer srv.Close()
+
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ t.Run("Make sure the apply batch command works", func(t *testing.T) {
+ // SET a 1
+ batch, err :=
hex.DecodeString("04000000000000000100000003013105010D0B5F5F6E616D6573706163656106010000000031")
+ require.NoError(t, err)
+ r := rdb.Do(ctx, "ApplyBatch", string(batch))
+ val, err := r.Int64()
+ require.NoError(t, err)
+ require.EqualValues(t, len(batch), val)
+ require.Equal(t, "1", rdb.Get(ctx, "a").Val())
+
+ // HSET hash field value
+ batch, err =
hex.DecodeString("05000000000000000200000003013201210B5F5F6E616D65737061636500000004686173683076F331696342A76669656C640576616C75650501100B5F5F6E616D657370616365686173681102000000003076F331696342A700000002")
+ require.NoError(t, err)
+ r = rdb.Do(ctx, "ApplyBatch", string(batch))
+ val, err = r.Int64()
+ require.NoError(t, err)
+ require.EqualValues(t, len(batch), val)
+ require.Equal(t, "value", rdb.HGet(ctx, "hash", "field").Val())
+ })
+}