This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 5cbc123e Add support of new command: RESET (#1999)
5cbc123e is described below
commit 5cbc123ee05f0b6dbd95b504ae262d00d03549be
Author: Nuo Xu <[email protected]>
AuthorDate: Sat Jan 13 19:26:19 2024 -0800
Add support of new command: RESET (#1999)
Co-authored-by: hulk <[email protected]>
---
src/commands/cmd_server.cc | 37 +++++++++++++++++--
src/server/worker.cc | 10 ++++++
src/server/worker.h | 1 +
tests/gocase/unit/reset/reset_test.go | 67 +++++++++++++++++++++++++++++++++++
4 files changed, 113 insertions(+), 2 deletions(-)
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index d94d81e7..d921c766 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -1222,6 +1222,39 @@ class CommandAnalyze : public Commander {
std::vector<std::string> command_args_;
};
+class CommandReset : public Commander {
+ public:
+ Status Execute(Server *srv, Connection *conn, std::string *output) override {
+ // 1. Discards the current MULTI transaction block, if one exists.
+ if (conn->IsFlagEnabled(Connection::kMultiExec)) {
+ conn->ResetMultiExec();
+ }
+ // 2. Unwatches all keys WATCHed by the connection.
+ srv->ResetWatchedKeys(conn);
+ // 3. Disables CLIENT TRACKING, if in use. (not yet supported)
+ // 4. Sets the connection to READWRITE mode.
+ // 5. Cancels the connection's ASKING mode, if previously set. (not yet
supported)
+ // 6. Sets CLIENT REPLY to ON. (not yet supported)
+ // 9. Exits MONITOR mode, when applicable.
+ if (conn->IsFlagEnabled(Connection::kMonitor)) {
+ conn->Owner()->QuitMonitorConn(conn);
+ }
+ // 10. Aborts Pub/Sub's subscription state (SUBSCRIBE and PSUBSCRIBE),
when appropriate.
+ if (conn->SubscriptionsCount() != 0) {
+ conn->UnsubscribeAll();
+ }
+ if (conn->PSubscriptionsCount() != 0) {
+ conn->PUnsubscribeAll();
+ }
+ // 11. Deauthenticates the connection, requiring a call AUTH to
reauthenticate when authentication is enabled.
+ conn->SetNamespace(kDefaultNamespace);
+ conn->BecomeAdmin();
+ // 12. Turns off NO-EVICT / NO-TOUCH mode. (not yet supported)
+ *output = redis::SimpleString("RESET");
+ return Status::OK();
+ }
+};
+
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth", 2, "read-only
ok-loading", 0, 0, 0),
MakeCmdAttr<CommandPing>("ping", -1, "read-only", 0,
0, 0),
MakeCmdAttr<CommandSelect>("select", 2, "read-only",
0, 0, 0),
@@ -1257,6 +1290,6 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth",
2, "read-only ok-loadin
MakeCmdAttr<CommandSlaveOf>("slaveof", 3, "read-only
exclusive no-script", 0, 0, 0),
MakeCmdAttr<CommandStats>("stats", 1, "read-only", 0,
0, 0),
MakeCmdAttr<CommandRdb>("rdb", -3, "write exclusive",
0, 0, 0),
- MakeCmdAttr<CommandAnalyze>("analyze", -1, "", 0, 0,
0), )
-
+ MakeCmdAttr<CommandAnalyze>("analyze", -1, "", 0, 0,
0),
+ MakeCmdAttr<CommandReset>("reset", -1, "multi
pub-sub", 0, 0, 0), )
} // namespace redis
diff --git a/src/server/worker.cc b/src/server/worker.cc
index 47042d03..1e8fed37 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -464,6 +464,16 @@ void Worker::BecomeMonitorConn(redis::Connection *conn) {
conn->EnableFlag(redis::Connection::kMonitor);
}
+void Worker::QuitMonitorConn(redis::Connection *conn) {
+ {
+ std::lock_guard<std::mutex> guard(conns_mu_);
+ monitor_conns_.erase(conn->GetFD());
+ conns_[conn->GetFD()] = conn;
+ }
+ srv->DecrMonitorClientNum();
+ conn->DisableFlag(redis::Connection::kMonitor);
+}
+
void Worker::FeedMonitorConns(redis::Connection *conn, const std::string
&response) {
std::unique_lock<std::mutex> lock(conns_mu_);
diff --git a/src/server/worker.h b/src/server/worker.h
index a9f618e5..b6918ba9 100644
--- a/src/server/worker.h
+++ b/src/server/worker.h
@@ -62,6 +62,7 @@ class Worker : EventCallbackBase<Worker>,
EvconnlistenerBase<Worker> {
Status EnableWriteEvent(int fd);
Status Reply(int fd, const std::string &reply);
void BecomeMonitorConn(redis::Connection *conn);
+ void QuitMonitorConn(redis::Connection *conn);
void FeedMonitorConns(redis::Connection *conn, const std::string &response);
std::string GetClientsStr();
diff --git a/tests/gocase/unit/reset/reset_test.go
b/tests/gocase/unit/reset/reset_test.go
new file mode 100644
index 00000000..9d16edb1
--- /dev/null
+++ b/tests/gocase/unit/reset/reset_test.go
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package reset
+
+import (
+ "context"
+ "fmt"
+ "testing"
+
+ "github.com/apache/kvrocks/tests/gocase/util"
+ "github.com/stretchr/testify/require"
+)
+
+func TestReset(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{})
+ defer srv.Close()
+
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ t.Run("reset with ongoing txn", func(t *testing.T) {
+ require.NoError(t, rdb.Set(ctx, "x", "30", 0).Err())
+ require.NoError(t, rdb.Do(ctx, "multi").Err())
+ require.NoError(t, rdb.Set(ctx, "x", "40", 0).Err())
+ require.NoError(t, rdb.Do(ctx, "reset").Err())
+
+ v1 := rdb.Do(ctx, "get", "x").Val()
+ require.Equal(t, "30", fmt.Sprintf("%v", v1))
+ })
+
+ t.Run("unwatch keys", func(t *testing.T) {
+ require.NoError(t, rdb.Set(ctx, "x", 30, 0).Err())
+ require.NoError(t, rdb.Do(ctx, "watch", "x").Err())
+ require.NoError(t, rdb.Do(ctx, "multi").Err())
+ require.NoError(t, rdb.Do(ctx, "ping").Err())
+ require.NoError(t, rdb.Do(ctx, "reset").Err())
+
+ require.NoError(t, rdb.Set(ctx, "x", 40, 0).Err())
+ require.NoError(t, rdb.Do(ctx, "multi").Err())
+ require.NoError(t, rdb.Do(ctx, "ping").Err())
+ require.Equal(t, rdb.Do(ctx, "exec").Val(),
[]interface{}{"PONG"})
+ })
+
+ t.Run("unsub and punsub", func(t *testing.T) {
+ require.NoError(t, rdb.Do(ctx, "subscribe", "chan1").Err())
+ require.NoError(t, rdb.Do(ctx, "reset").Err())
+ require.Equal(t, rdb.Do(ctx, "subscribe", "chan2").Val(),
[]interface{}{"subscribe", "chan2", (int64)(1)})
+ })
+}