This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 275ab320 feat(command): Implement the new command POLLUPDATES for
polling updates by sequence (#2472)
275ab320 is described below
commit 275ab32054faa72c59c2b40b4eff69a8011a6bbc
Author: hulk <[email protected]>
AuthorDate: Fri Aug 9 09:56:20 2024 +0800
feat(command): Implement the new command POLLUPDATES for polling updates by
sequence (#2472)
As proposed in issue #2469, we would like to add a new command for
polling updates from Kvrocks. The main purpose is to allow to implement
features like CDC(Change Stream Capture) without running an agent
alongside Kvrocks instances to make it easier to operate.
The following is the command format:
```shell
POLLUPDATES <Sequence Number> [MAX <N>] [STRICT] [FORMAT <RAW>]
```
- `Sequence Number` represents the start sequence of the polling operation
and it’s a required argument.
- `MAX` represents the maximum number of items that can be retrieved, it’s
an optional argument and uses `16` as the default value if it’s missing
- `STRICT` is set means the update’s sequence MUST be exactly equal to the
sequence number, it’s an optional argument. `GetUpdatesSince` will return the
first available sequence if the sequence number is non-existent, so we allow
users to specify if required to match the input sequence number.
The output contains the following sections:
- last_sequence
- updates
- next_sequence
For example, we assume the DB's latest sequence is 100 and we send the
command: `POLLUPDATES 90 MAX 3 FORMAT RAW`, it will return the following
response:
```shell
"latest_sequence"
100
"updates"
{batch-0}
{batch-1}
{batch-2}
"next_sequence"
93
```
This will close #2469
Co-authored-by: Twice <[email protected]>
---
src/commands/cmd_server.cc | 58 ++++++++-
src/server/redis_connection.h | 1 +
src/server/redis_reply.cc | 9 ++
src/server/redis_reply.h | 2 +
src/server/server.cc | 30 +++++
src/server/server.h | 1 +
tests/gocase/unit/server/poll_updates_test.go | 169 ++++++++++++++++++++++++++
7 files changed, 269 insertions(+), 1 deletion(-)
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index d595875d..ac9179e7 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -1265,6 +1265,61 @@ class CommandDump : public Commander {
}
};
+class CommandPollUpdates : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ CommandParser parser(args, 1);
+ sequence_ = GET_OR_RET(parser.TakeInt<uint64_t>());
+
+ while (parser.Good()) {
+ if (parser.EatEqICase("MAX")) {
+ max_ = GET_OR_RET(parser.TakeInt<int64_t>(NumericRange<int64_t>{1,
1000}));
+ } else if (parser.EatEqICase("STRICT")) {
+ is_strict_ = true;
+ } else if (parser.EatEqICase("FORMAT")) {
+ auto format = GET_OR_RET(parser.TakeStr());
+ if (util::EqualICase(format, "RAW")) {
+ format_ = Format::Raw;
+ } else {
+ return {Status::RedisParseErr, "invalid FORMAT option, only support
RAW"};
+ }
+ } else {
+ return {Status::RedisParseErr, errInvalidSyntax};
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Execute(Server *srv, Connection *conn, std::string *output) override {
+ uint64_t next_sequence = sequence_;
+ // sequence + 1 is for excluding the current sequence to avoid getting
duplicate updates
+ auto batches = GET_OR_RET(srv->PollUpdates(sequence_ + 1, max_,
is_strict_));
+ std::string updates = redis::MultiLen(batches.size());
+ for (const auto &batch : batches) {
+ updates +=
redis::BulkString(util::StringToHex(batch.writeBatchPtr->Data()));
+ // It might contain more than one sequence in a batch
+ next_sequence = batch.sequence + batch.writeBatchPtr->Count() - 1;
+ }
+
+ *output = conn->Map({
+ {redis::BulkString("latest_sequence"),
redis::Integer(srv->storage->LatestSeqNumber())},
+ {redis::BulkString("updates"), std::move(updates)},
+ {redis::BulkString("next_sequence"), redis::Integer(next_sequence)},
+ });
+ return Status::OK();
+ }
+
+ private:
+ enum class Format {
+ Raw,
+ };
+
+ uint64_t sequence_ = -1;
+ bool is_strict_ = false;
+ int64_t max_ = 16;
+ Format format_ = Format::Raw;
+};
+
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth", 2, "read-only
ok-loading", 0, 0, 0),
MakeCmdAttr<CommandPing>("ping", -1, "read-only", 0,
0, 0),
MakeCmdAttr<CommandSelect>("select", 2, "read-only",
0, 0, 0),
@@ -1302,5 +1357,6 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth",
2, "read-only ok-loadin
MakeCmdAttr<CommandRdb>("rdb", -3, "write exclusive",
0, 0, 0),
MakeCmdAttr<CommandReset>("reset", 1, "ok-loading
multi no-script pub-sub", 0, 0, 0),
MakeCmdAttr<CommandApplyBatch>("applybatch", -2,
"write no-multi", 0, 0, 0),
- MakeCmdAttr<CommandDump>("dump", 2, "read-only", 0, 0,
0), )
+ MakeCmdAttr<CommandDump>("dump", 2, "read-only", 0, 0,
0),
+ MakeCmdAttr<CommandPollUpdates>("pollupdates", -2,
"read-only", 0, 0, 0), )
} // namespace redis
diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h
index 78e4a67c..83c57c01 100644
--- a/src/server/redis_connection.h
+++ b/src/server/redis_connection.h
@@ -96,6 +96,7 @@ class Connection : public EvbufCallbackBase<Connection> {
std::string MapOfBulkStrings(const std::vector<std::string> &elems) const {
return redis::MapOfBulkStrings(protocol_version_, elems);
}
+ std::string Map(const std::map<std::string, std::string> &map) const {
return redis::Map(protocol_version_, map); }
template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
std::string HeaderOfAttribute(T len) const {
return redis::HeaderOfAttribute(len);
diff --git a/src/server/redis_reply.cc b/src/server/redis_reply.cc
index 04e8bbd8..d01c823a 100644
--- a/src/server/redis_reply.cc
+++ b/src/server/redis_reply.cc
@@ -124,4 +124,13 @@ std::string MapOfBulkStrings(RESP ver, const
std::vector<std::string> &elems) {
return result;
}
+std::string Map(RESP ver, const std::map<std::string, std::string> &map) {
+ std::string result = HeaderOfMap(ver, map.size());
+ for (const auto &pair : map) {
+ result += pair.first;
+ result += pair.second;
+ }
+ return result;
+}
+
} // namespace redis
diff --git a/src/server/redis_reply.h b/src/server/redis_reply.h
index ceb6042e..9442e8c2 100644
--- a/src/server/redis_reply.h
+++ b/src/server/redis_reply.h
@@ -22,6 +22,7 @@
#include <event2/buffer.h>
+#include <map>
#include <string>
#include <vector>
@@ -98,6 +99,7 @@ template <typename T, std::enable_if_t<std::is_integral_v<T>,
int> = 0>
std::string HeaderOfMap(RESP ver, T len) {
return ver == RESP::v3 ? "%" + std::to_string(len) + CRLF : MultiLen(len *
2);
}
+std::string Map(RESP ver, const std::map<std::string, std::string> &map);
std::string MapOfBulkStrings(RESP ver, const std::vector<std::string> &elems);
template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
std::string HeaderOfAttribute(T len) {
diff --git a/src/server/server.cc b/src/server/server.cc
index 5e3b7f13..7e4c4f2d 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -1556,6 +1556,36 @@ int64_t Server::GetLastScanTime(const std::string &ns)
const {
return 0;
}
+StatusOr<std::vector<rocksdb::BatchResult>> Server::PollUpdates(uint64_t
next_sequence, int64_t count,
+ bool
is_strict) const {
+ std::vector<rocksdb::BatchResult> batches;
+ auto latest_sequence = storage->LatestSeqNumber();
+ if (next_sequence == latest_sequence + 1) {
+ // return empty result if there is no new updates
+ return batches;
+ } else if (next_sequence > latest_sequence + 1) {
+ return {Status::NotOK, "next sequence is out of range"};
+ }
+
+ std::unique_ptr<rocksdb::TransactionLogIterator> iter;
+ if (auto s = storage->GetWALIter(next_sequence, &iter); !s.IsOK()) return s;
+ if (!iter) {
+ return Status{Status::NotOK, "unable to get WAL iterator"};
+ }
+
+ for (int64_t i = 0; i < count && iter->Valid() && iter->status().ok(); ++i,
iter->Next()) {
+ // The first batch should have the same sequence number as the next
sequence number
+ // if it requires strictly matched.
+ auto batch = iter->GetBatch();
+ if (i == 0 && is_strict && batch.sequence != next_sequence) {
+ return {Status::NotOK,
+ fmt::format("mismatched sequence number, expected {} but got
{}", next_sequence, batch.sequence)};
+ }
+ batches.emplace_back(std::move(batch));
+ }
+ return batches;
+}
+
void Server::SlowlogPushEntryIfNeeded(const std::vector<std::string> *args,
uint64_t duration,
const redis::Connection *conn) {
int64_t threshold = config_->slowlog_log_slower_than;
diff --git a/src/server/server.h b/src/server/server.h
index c1793e81..1bb639ba 100644
--- a/src/server/server.h
+++ b/src/server/server.h
@@ -253,6 +253,7 @@ class Server {
Status AsyncScanDBSize(const std::string &ns);
void GetLatestKeyNumStats(const std::string &ns, KeyNumStats *stats);
int64_t GetLastScanTime(const std::string &ns) const;
+ StatusOr<std::vector<rocksdb::BatchResult>> PollUpdates(uint64_t
next_sequence, int64_t count, bool is_strict) const;
std::string GenerateCursorFromKeyName(const std::string &key_name,
CursorType cursor_type, const char *prefix = "");
std::string GetKeyNameFromCursor(const std::string &cursor, CursorType
cursor_type);
diff --git a/tests/gocase/unit/server/poll_updates_test.go
b/tests/gocase/unit/server/poll_updates_test.go
new file mode 100644
index 00000000..dee87523
--- /dev/null
+++ b/tests/gocase/unit/server/poll_updates_test.go
@@ -0,0 +1,169 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+ */
+
+package server
+
+import (
+ "context"
+ "encoding/hex"
+ "fmt"
+ "strconv"
+ "testing"
+
+ "github.com/apache/kvrocks/tests/gocase/util"
+ "github.com/stretchr/testify/require"
+)
+
+type PollUpdatesResult struct {
+ LatestSeq int64
+ NextSeq int64
+ Updates []string
+}
+
+func sliceToPollUpdatesResult(t *testing.T, slice []interface{})
*PollUpdatesResult {
+ itemCount := 6
+ require.Len(t, slice, itemCount)
+ var latestSeq, nextSeq int64
+
+ updates := make([]string, 0)
+ for i := 0; i < itemCount; i += 2 {
+ key := slice[i].(string)
+ switch key {
+ case "latest_sequence":
+ latestSeq = slice[i+1].(int64)
+ case "next_sequence":
+ nextSeq = slice[i+1].(int64)
+ case "updates":
+ fields := slice[i+1].([]interface{})
+ for _, field := range fields {
+ str, ok := field.(string)
+ require.True(t, ok)
+ updates = append(updates, str)
+ }
+ default:
+ require.Fail(t, fmt.Sprintf("unknown key: %s", key))
+ }
+ }
+
+ return &PollUpdatesResult{
+ LatestSeq: latestSeq,
+ NextSeq: nextSeq,
+ Updates: updates,
+ }
+}
+
+func TestPollUpdates_Basic(t *testing.T) {
+ ctx := context.Background()
+
+ srv0 := util.StartServer(t, map[string]string{})
+ defer srv0.Close()
+ rdb0 := srv0.NewClient()
+ defer func() { require.NoError(t, rdb0.Close()) }()
+
+ srv1 := util.StartServer(t, map[string]string{})
+ defer srv1.Close()
+ rdb1 := srv1.NewClient()
+ defer func() { require.NoError(t, rdb1.Close()) }()
+
+ t.Run("Make sure the command POLLUPDATES works well", func(t
*testing.T) {
+ for i := 0; i < 10; i++ {
+ rdb0.Set(ctx, fmt.Sprintf("key-%d", i), i, 0)
+ }
+
+ updates := make([]string, 0)
+ slice, err := rdb0.Do(ctx, "POLLUPDATES", 0, "MAX", 6).Slice()
+ require.NoError(t, err)
+ pollUpdates := sliceToPollUpdatesResult(t, slice)
+ require.EqualValues(t, 10, pollUpdates.LatestSeq)
+ require.EqualValues(t, 6, pollUpdates.NextSeq)
+ require.Len(t, pollUpdates.Updates, 6)
+ updates = append(updates, pollUpdates.Updates...)
+
+ slice, err = rdb0.Do(ctx, "POLLUPDATES", pollUpdates.NextSeq,
"MAX", 6).Slice()
+ require.NoError(t, err)
+ pollUpdates = sliceToPollUpdatesResult(t, slice)
+ require.EqualValues(t, 10, pollUpdates.LatestSeq)
+ require.EqualValues(t, 10, pollUpdates.NextSeq)
+ require.Len(t, pollUpdates.Updates, 4)
+ updates = append(updates, pollUpdates.Updates...)
+
+ for i := 0; i < 10; i++ {
+ batch, err := hex.DecodeString(updates[i])
+ require.NoError(t, err)
+ applied, err := rdb1.Do(ctx, "APPLYBATCH", batch).Bool()
+ require.NoError(t, err)
+ require.True(t, applied)
+ require.EqualValues(t, strconv.Itoa(i), rdb1.Get(ctx,
fmt.Sprintf("key-%d", i)).Val())
+ }
+ })
+
+ t.Run("Runs POLLUPDATES with invalid arguments", func(t *testing.T) {
+ require.ErrorContains(t, rdb0.Do(ctx, "POLLUPDATES", 0, "MAX",
-1).Err(),
+ "ERR out of numeric range")
+ require.ErrorContains(t, rdb0.Do(ctx, "POLLUPDATES", 0, "MAX",
1001).Err(),
+ "ERR out of numeric range")
+ require.ErrorContains(t, rdb0.Do(ctx, "POLLUPDATES", 0,
"FORMAT", "COMMAND").Err(),
+ "ERR invalid FORMAT option, only support RAW")
+ require.ErrorContains(t, rdb0.Do(ctx, "POLLUPDATES", 12,
"FORMAT", "RAW").Err(),
+ "ERR next sequence is out of range")
+ require.Error(t, rdb0.Do(ctx, "POLLUPDATES", 1, "FORMAT",
"EXTRA").Err())
+ })
+}
+
+func TestPollUpdates_WithStrict(t *testing.T) {
+ ctx := context.Background()
+
+ srv0 := util.StartServer(t, map[string]string{})
+ defer srv0.Close()
+ rdb0 := srv0.NewClient()
+ defer func() { require.NoError(t, rdb0.Close()) }()
+
+ srv1 := util.StartServer(t, map[string]string{})
+ defer srv1.Close()
+ rdb1 := srv1.NewClient()
+ defer func() { require.NoError(t, rdb1.Close()) }()
+
+ // The latest sequence is 2 after running the HSET command, 1 for the
metadata and 1 for the field
+ require.NoError(t, rdb0.HSet(ctx, "h0", "f0", "v0").Err())
+ // The latest sequence is 3 after running the SET command
+ require.NoError(t, rdb0.Set(ctx, "k0", "v0", 0).Err())
+
+ // PollUpdates with strict mode should return an error if the sequence
number is mismatched
+ err := rdb0.Do(ctx, "POLLUPDATES", 1, "MAX", 1, "STRICT").Err()
+ require.ErrorContains(t, err, "ERR mismatched sequence number")
+
+ // Works well if the sequence number is mismatched but not in strict
mode
+ require.NoError(t, rdb0.Do(ctx, "POLLUPDATES", 1, "MAX", 1).Err())
+
+ slice, err := rdb0.Do(ctx, "POLLUPDATES", 0, "MAX", 10,
"STRICT").Slice()
+ require.NoError(t, err)
+ pollUpdates := sliceToPollUpdatesResult(t, slice)
+ require.EqualValues(t, 3, pollUpdates.LatestSeq)
+ require.EqualValues(t, 3, pollUpdates.NextSeq)
+ require.Len(t, pollUpdates.Updates, 2)
+
+ for _, update := range pollUpdates.Updates {
+ batch, err := hex.DecodeString(update)
+ require.NoError(t, err)
+ require.NoError(t, rdb1.Do(ctx, "APPLYBATCH", batch).Err())
+ }
+
+ require.Equal(t, "v0", rdb1.Get(ctx, "k0").Val())
+ require.Equal(t, "v0", rdb1.HGet(ctx, "h0", "f0").Val())
+}