git-hulk commented on code in PR #3378:
URL: https://github.com/apache/kvrocks/pull/3378#discussion_r2869015957
##########
src/server/server.cc:
##########
@@ -994,10 +995,100 @@ void Server::cron() {
}
CleanupExitedSlaves();
+
+ // CLIENT PAUSE timeout check
+ if (client_pause_end_time_ != 0) {
+ uint64_t now_ms = util::GetTimeStampMS();
+ if (now_ms >= client_pause_end_time_) {
+ ClientPauseUnpause();
+ }
+ }
+
recordInstantaneousMetrics();
}
}
+void Server::SetClientPause(uint64_t end_time_ms, PauseMode mode) {
Review Comment:
Could be `PauseClients`
##########
tests/gocase/unit/client/client_pause_test.go:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/apache/kvrocks/tests/gocase/util"
+ "github.com/redis/go-redis/v9"
+ "github.com/stretchr/testify/require"
+)
+
+func TestClientPause(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{
+ "requirepass": "admin",
+ })
+ defer srv.Close()
+
+ ctx := context.Background()
+
+ adminClient := srv.NewClientWithOption(&redis.Options{Password:
"admin"})
+ defer func() { require.NoError(t, adminClient.Close()) }()
+
+ // unpauseClient is a separate connection used to send CLIENT UNPAUSE.
+ // It must be a different connection from the one being paused, because
+ // the paused connection has its read events disabled and cannot receive
+ // any new commands until it is resumed.
+ unpauseClient := srv.NewClientWithOption(&redis.Options{Password:
"admin"})
+ defer func() { require.NoError(t, unpauseClient.Close()) }()
+
+ t.Run("CLIENT PAUSE blocks write commands in ALL mode", func(t
*testing.T) {
+ require.NoError(t, adminClient.Do(ctx, "CLIENT", "PAUSE",
"500").Err())
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ start := time.Now()
+ go func() {
+ defer wg.Done()
+ writeClient :=
srv.NewClientWithOption(&redis.Options{Password: "admin"})
+ defer func() { require.NoError(t, writeClient.Close())
}()
+ require.NoError(t, writeClient.Set(ctx, "k1", "v1",
0).Err())
+ }()
+ wg.Wait()
+ require.GreaterOrEqual(t, time.Since(start).Milliseconds(),
int64(400))
+
+ require.NoError(t, unpauseClient.Do(ctx, "CLIENT",
"UNPAUSE").Err())
+ })
+
+ t.Run("CLIENT UNPAUSE releases paused clients immediately", func(t
*testing.T) {
+ require.NoError(t, adminClient.Do(ctx, "CLIENT", "PAUSE",
"10000").Err())
+
+ writeClient := srv.NewClientWithOption(&redis.Options{Password:
"admin"})
+ defer func() { require.NoError(t, writeClient.Close()) }()
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ start := time.Now()
+ go func() {
+ defer wg.Done()
+ require.NoError(t, writeClient.Set(ctx, "k2", "v2",
0).Err())
+ }()
+
+ time.Sleep(100 * time.Millisecond)
+ require.NoError(t, unpauseClient.Do(ctx, "CLIENT",
"UNPAUSE").Err())
Review Comment:
Can check the value of key `k2` to confirm the value is null before
`UNPAUSE` and should be `v2` after `UNPAUSE`
##########
src/server/server.cc:
##########
@@ -994,10 +995,100 @@ void Server::cron() {
}
CleanupExitedSlaves();
+
+ // CLIENT PAUSE timeout check
+ if (client_pause_end_time_ != 0) {
+ uint64_t now_ms = util::GetTimeStampMS();
+ if (now_ms >= client_pause_end_time_) {
+ ClientPauseUnpause();
+ }
+ }
+
recordInstantaneousMetrics();
}
}
+void Server::SetClientPause(uint64_t end_time_ms, PauseMode mode) {
+ std::lock_guard<std::mutex> lock(client_pause_mu_);
+ client_pause_end_time_ = end_time_ms;
+ client_pause_mode_ = mode;
+}
+
+void Server::ClientPauseUnpause() {
Review Comment:
This should be `UnpauseClients`?
##########
src/server/redis_connection.cc:
##########
@@ -226,6 +227,21 @@ bool Connection::CanMigrate() const {
&& subscribe_channels_.empty() && subscribe_patterns_.empty(); //
not subscribing any channel
}
+void Connection::SuspendForPause() {
Review Comment:
It would be better to use `Pause` and `Unpause` without introducing a new
term.
##########
src/server/redis_connection.cc:
##########
@@ -226,6 +227,21 @@ bool Connection::CanMigrate() const {
&& subscribe_channels_.empty() && subscribe_patterns_.empty(); //
not subscribing any channel
}
+void Connection::SuspendForPause() {
+ if (is_paused_) return;
+ is_paused_ = true;
+ bufferevent_disable(bev_, EV_READ);
+}
+
+void Connection::ResumeFromPause() {
Review Comment:
See the above comment
##########
src/server/worker.cc:
##########
@@ -404,8 +410,17 @@ void Worker::MigrateConnection(Worker *target,
redis::Connection *conn) {
}
bufferevent_base_set(target->base_, bev);
conn->SetCB(bev);
- bufferevent_enable(bev, EV_READ | EV_WRITE);
conn->SetOwner(target);
+ // Update the worker pointer in paused_conns_ before re-enabling events so
Review Comment:
I think we can simplify this by disallowing to change the worker thread if
it's in paused mode.
##########
src/server/server.cc:
##########
@@ -994,10 +995,100 @@ void Server::cron() {
}
CleanupExitedSlaves();
+
+ // CLIENT PAUSE timeout check
+ if (client_pause_end_time_ != 0) {
+ uint64_t now_ms = util::GetTimeStampMS();
+ if (now_ms >= client_pause_end_time_) {
+ ClientPauseUnpause();
+ }
+ }
Review Comment:
```suggestion
if (client_pause_end_time_ != 0 && util::GetTimeStampMS() >
client_pause_end_time_) {
ClientPauseUnpause();
}
```
##########
tests/gocase/unit/client/client_pause_test.go:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/apache/kvrocks/tests/gocase/util"
+ "github.com/redis/go-redis/v9"
+ "github.com/stretchr/testify/require"
+)
+
+func TestClientPause(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{
+ "requirepass": "admin",
+ })
+ defer srv.Close()
+
+ ctx := context.Background()
+
+ adminClient := srv.NewClientWithOption(&redis.Options{Password:
"admin"})
+ defer func() { require.NoError(t, adminClient.Close()) }()
+
+ // unpauseClient is a separate connection used to send CLIENT UNPAUSE.
+ // It must be a different connection from the one being paused, because
+ // the paused connection has its read events disabled and cannot receive
+ // any new commands until it is resumed.
+ unpauseClient := srv.NewClientWithOption(&redis.Options{Password:
"admin"})
+ defer func() { require.NoError(t, unpauseClient.Close()) }()
+
+ t.Run("CLIENT PAUSE blocks write commands in ALL mode", func(t
*testing.T) {
+ require.NoError(t, adminClient.Do(ctx, "CLIENT", "PAUSE",
"500").Err())
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ start := time.Now()
+ go func() {
+ defer wg.Done()
+ writeClient :=
srv.NewClientWithOption(&redis.Options{Password: "admin"})
+ defer func() { require.NoError(t, writeClient.Close())
}()
+ require.NoError(t, writeClient.Set(ctx, "k1", "v1",
0).Err())
+ }()
+ wg.Wait()
+ require.GreaterOrEqual(t, time.Since(start).Milliseconds(),
int64(400))
+
+ require.NoError(t, unpauseClient.Do(ctx, "CLIENT",
"UNPAUSE").Err())
+ })
+
+ t.Run("CLIENT UNPAUSE releases paused clients immediately", func(t
*testing.T) {
+ require.NoError(t, adminClient.Do(ctx, "CLIENT", "PAUSE",
"10000").Err())
+
+ writeClient := srv.NewClientWithOption(&redis.Options{Password:
"admin"})
+ defer func() { require.NoError(t, writeClient.Close()) }()
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ start := time.Now()
+ go func() {
+ defer wg.Done()
+ require.NoError(t, writeClient.Set(ctx, "k2", "v2",
0).Err())
+ }()
+
+ time.Sleep(100 * time.Millisecond)
+ require.NoError(t, unpauseClient.Do(ctx, "CLIENT",
"UNPAUSE").Err())
+ wg.Wait()
+ require.Less(t, time.Since(start).Milliseconds(), int64(5000))
+ })
+
+ t.Run("CLIENT PAUSE WRITE blocks write but not read commands", func(t
*testing.T) {
+ require.NoError(t, adminClient.Set(ctx, "readkey", "hello",
0).Err())
+ require.NoError(t, adminClient.Do(ctx, "CLIENT", "PAUSE",
"2000", "WRITE").Err())
+
+ // Read commands on a separate connection should complete
immediately.
+ readClient := srv.NewClientWithOption(&redis.Options{Password:
"admin"})
+ defer func() { require.NoError(t, readClient.Close()) }()
+ start := time.Now()
+ val, err := readClient.Get(ctx, "readkey").Result()
+ require.NoError(t, err)
+ require.Equal(t, "hello", val)
+ require.Less(t, time.Since(start).Milliseconds(), int64(1000))
+
+ require.NoError(t, unpauseClient.Do(ctx, "CLIENT",
"UNPAUSE").Err())
Review Comment:
Should confirm if the WRITE command is correctly paused
##########
src/server/server.cc:
##########
@@ -994,10 +995,100 @@ void Server::cron() {
}
CleanupExitedSlaves();
+
+ // CLIENT PAUSE timeout check
+ if (client_pause_end_time_ != 0) {
+ uint64_t now_ms = util::GetTimeStampMS();
+ if (now_ms >= client_pause_end_time_) {
+ ClientPauseUnpause();
+ }
+ }
+
recordInstantaneousMetrics();
}
}
+void Server::SetClientPause(uint64_t end_time_ms, PauseMode mode) {
+ std::lock_guard<std::mutex> lock(client_pause_mu_);
+ client_pause_end_time_ = end_time_ms;
+ client_pause_mode_ = mode;
+}
+
+void Server::ClientPauseUnpause() {
+ std::vector<PausedConnEntry> to_resume;
Review Comment:
to_resume => `paused_conns`
##########
tests/gocase/unit/client/client_pause_test.go:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/apache/kvrocks/tests/gocase/util"
+ "github.com/redis/go-redis/v9"
+ "github.com/stretchr/testify/require"
+)
+
+func TestClientPause(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{
+ "requirepass": "admin",
+ })
+ defer srv.Close()
+
+ ctx := context.Background()
+
+ adminClient := srv.NewClientWithOption(&redis.Options{Password:
"admin"})
+ defer func() { require.NoError(t, adminClient.Close()) }()
+
+ // unpauseClient is a separate connection used to send CLIENT UNPAUSE.
+ // It must be a different connection from the one being paused, because
+ // the paused connection has its read events disabled and cannot receive
+ // any new commands until it is resumed.
+ unpauseClient := srv.NewClientWithOption(&redis.Options{Password:
"admin"})
+ defer func() { require.NoError(t, unpauseClient.Close()) }()
+
+ t.Run("CLIENT PAUSE blocks write commands in ALL mode", func(t
*testing.T) {
+ require.NoError(t, adminClient.Do(ctx, "CLIENT", "PAUSE",
"500").Err())
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ start := time.Now()
+ go func() {
Review Comment:
We don't need to use the goroutine here?
##########
src/server/server.cc:
##########
@@ -994,10 +995,100 @@ void Server::cron() {
}
CleanupExitedSlaves();
+
+ // CLIENT PAUSE timeout check
+ if (client_pause_end_time_ != 0) {
+ uint64_t now_ms = util::GetTimeStampMS();
+ if (now_ms >= client_pause_end_time_) {
+ ClientPauseUnpause();
+ }
+ }
+
recordInstantaneousMetrics();
}
}
+void Server::SetClientPause(uint64_t end_time_ms, PauseMode mode) {
+ std::lock_guard<std::mutex> lock(client_pause_mu_);
+ client_pause_end_time_ = end_time_ms;
+ client_pause_mode_ = mode;
+}
+
+void Server::ClientPauseUnpause() {
+ std::vector<PausedConnEntry> to_resume;
+ {
+ std::lock_guard<std::mutex> lock(client_pause_mu_);
+ client_pause_end_time_ = 0;
+ client_pause_mode_ = PauseMode::kOff;
+ to_resume.swap(paused_conns_);
+ }
+ // Resume via the worker so fd+id validation under conns_mu_ serializes with
FreeConnection.
+ for (auto &entry : to_resume) {
+ entry.worker->ResumeConnectionFromPause(entry.fd, entry.id);
+ }
+}
+
+bool Server::PauseIfNeeded(redis::Connection *conn, const std::string
&cmd_name, uint64_t cmd_flags) {
Review Comment:
PauseIfNeeded => PauseConnIfNeeded
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]