This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 5cbc123e Add support of new command: RESET (#1999)
5cbc123e is described below

commit 5cbc123ee05f0b6dbd95b504ae262d00d03549be
Author: Nuo Xu <[email protected]>
AuthorDate: Sat Jan 13 19:26:19 2024 -0800

    Add support of new command: RESET (#1999)
    
    Co-authored-by: hulk <[email protected]>
---
 src/commands/cmd_server.cc            | 37 +++++++++++++++++--
 src/server/worker.cc                  | 10 ++++++
 src/server/worker.h                   |  1 +
 tests/gocase/unit/reset/reset_test.go | 67 +++++++++++++++++++++++++++++++++++
 4 files changed, 113 insertions(+), 2 deletions(-)

diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index d94d81e7..d921c766 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -1222,6 +1222,39 @@ class CommandAnalyze : public Commander {
   std::vector<std::string> command_args_;
 };
 
+class CommandReset : public Commander {
+ public:
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    // 1. Discards the current MULTI transaction block, if one exists.
+    if (conn->IsFlagEnabled(Connection::kMultiExec)) {
+      conn->ResetMultiExec();
+    }
+    // 2. Unwatches all keys WATCHed by the connection.
+    srv->ResetWatchedKeys(conn);
+    // 3. Disables CLIENT TRACKING, if in use. (not yet supported)
+    // 4. Sets the connection to READWRITE mode.
+    // 5. Cancels the connection's ASKING mode, if previously set. (not yet 
supported)
+    // 6. Sets CLIENT REPLY to ON. (not yet supported)
+    // 9. Exits MONITOR mode, when applicable.
+    if (conn->IsFlagEnabled(Connection::kMonitor)) {
+      conn->Owner()->QuitMonitorConn(conn);
+    }
+    // 10. Aborts Pub/Sub's subscription state (SUBSCRIBE and PSUBSCRIBE), 
when appropriate.
+    if (conn->SubscriptionsCount() != 0) {
+      conn->UnsubscribeAll();
+    }
+    if (conn->PSubscriptionsCount() != 0) {
+      conn->PUnsubscribeAll();
+    }
+    // 11. Deauthenticates the connection, requiring a call AUTH to 
reauthenticate when authentication is enabled.
+    conn->SetNamespace(kDefaultNamespace);
+    conn->BecomeAdmin();
+    // 12. Turns off NO-EVICT / NO-TOUCH mode. (not yet supported)
+    *output = redis::SimpleString("RESET");
+    return Status::OK();
+  }
+};
+
 REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth", 2, "read-only 
ok-loading", 0, 0, 0),
                         MakeCmdAttr<CommandPing>("ping", -1, "read-only", 0, 
0, 0),
                         MakeCmdAttr<CommandSelect>("select", 2, "read-only", 
0, 0, 0),
@@ -1257,6 +1290,6 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth", 
2, "read-only ok-loadin
                         MakeCmdAttr<CommandSlaveOf>("slaveof", 3, "read-only 
exclusive no-script", 0, 0, 0),
                         MakeCmdAttr<CommandStats>("stats", 1, "read-only", 0, 
0, 0),
                         MakeCmdAttr<CommandRdb>("rdb", -3, "write exclusive", 
0, 0, 0),
-                        MakeCmdAttr<CommandAnalyze>("analyze", -1, "", 0, 0, 
0), )
-
+                        MakeCmdAttr<CommandAnalyze>("analyze", -1, "", 0, 0, 
0),
+                        MakeCmdAttr<CommandReset>("reset", -1, "multi 
pub-sub", 0, 0, 0), )
 }  // namespace redis
diff --git a/src/server/worker.cc b/src/server/worker.cc
index 47042d03..1e8fed37 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -464,6 +464,16 @@ void Worker::BecomeMonitorConn(redis::Connection *conn) {
   conn->EnableFlag(redis::Connection::kMonitor);
 }
 
+void Worker::QuitMonitorConn(redis::Connection *conn) {
+  {
+    std::lock_guard<std::mutex> guard(conns_mu_);
+    monitor_conns_.erase(conn->GetFD());
+    conns_[conn->GetFD()] = conn;
+  }
+  srv->DecrMonitorClientNum();
+  conn->DisableFlag(redis::Connection::kMonitor);
+}
+
 void Worker::FeedMonitorConns(redis::Connection *conn, const std::string 
&response) {
   std::unique_lock<std::mutex> lock(conns_mu_);
 
diff --git a/src/server/worker.h b/src/server/worker.h
index a9f618e5..b6918ba9 100644
--- a/src/server/worker.h
+++ b/src/server/worker.h
@@ -62,6 +62,7 @@ class Worker : EventCallbackBase<Worker>, 
EvconnlistenerBase<Worker> {
   Status EnableWriteEvent(int fd);
   Status Reply(int fd, const std::string &reply);
   void BecomeMonitorConn(redis::Connection *conn);
+  void QuitMonitorConn(redis::Connection *conn);
   void FeedMonitorConns(redis::Connection *conn, const std::string &response);
 
   std::string GetClientsStr();
diff --git a/tests/gocase/unit/reset/reset_test.go 
b/tests/gocase/unit/reset/reset_test.go
new file mode 100644
index 00000000..9d16edb1
--- /dev/null
+++ b/tests/gocase/unit/reset/reset_test.go
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package reset
+
+import (
+       "context"
+       "fmt"
+       "testing"
+
+       "github.com/apache/kvrocks/tests/gocase/util"
+       "github.com/stretchr/testify/require"
+)
+
+func TestReset(t *testing.T) {
+       srv := util.StartServer(t, map[string]string{})
+       defer srv.Close()
+
+       ctx := context.Background()
+       rdb := srv.NewClient()
+       defer func() { require.NoError(t, rdb.Close()) }()
+
+       t.Run("reset with ongoing txn", func(t *testing.T) {
+               require.NoError(t, rdb.Set(ctx, "x", "30", 0).Err())
+               require.NoError(t, rdb.Do(ctx, "multi").Err())
+               require.NoError(t, rdb.Set(ctx, "x", "40", 0).Err())
+               require.NoError(t, rdb.Do(ctx, "reset").Err())
+
+               v1 := rdb.Do(ctx, "get", "x").Val()
+               require.Equal(t, "30", fmt.Sprintf("%v", v1))
+       })
+
+       t.Run("unwatch keys", func(t *testing.T) {
+               require.NoError(t, rdb.Set(ctx, "x", 30, 0).Err())
+               require.NoError(t, rdb.Do(ctx, "watch", "x").Err())
+               require.NoError(t, rdb.Do(ctx, "multi").Err())
+               require.NoError(t, rdb.Do(ctx, "ping").Err())
+               require.NoError(t, rdb.Do(ctx, "reset").Err())
+
+               require.NoError(t, rdb.Set(ctx, "x", 40, 0).Err())
+               require.NoError(t, rdb.Do(ctx, "multi").Err())
+               require.NoError(t, rdb.Do(ctx, "ping").Err())
+               require.Equal(t, rdb.Do(ctx, "exec").Val(), 
[]interface{}{"PONG"})
+       })
+
+       t.Run("unsub and punsub", func(t *testing.T) {
+               require.NoError(t, rdb.Do(ctx, "subscribe", "chan1").Err())
+               require.NoError(t, rdb.Do(ctx, "reset").Err())
+               require.Equal(t, rdb.Do(ctx, "subscribe", "chan2").Val(), 
[]interface{}{"subscribe", "chan2", (int64)(1)})
+       })
+}

Reply via email to