hubcio commented on code in PR #2964:
URL: https://github.com/apache/iggy/pull/2964#discussion_r3242573883
##########
foreign/go/client/tcp/tcp_core.go:
##########
@@ -256,49 +258,102 @@ func (c *IggyTcpClient) write(payload []byte) (int,
error) {
}
// do sends the command to the Iggy server and returns the response.
-func (c *IggyTcpClient) do(cmd command.Command) ([]byte, error) {
+func (c *IggyTcpClient) do(ctx context.Context, cmd command.Command) ([]byte,
error) {
data, err := cmd.MarshalBinary()
if err != nil {
return nil, err
}
- return c.sendAndFetchResponse(data, cmd.Code())
+ return c.sendAndFetchResponse(ctx, data, cmd.Code())
}
-func (c *IggyTcpClient) sendAndFetchResponse(message []byte, command
command.Code) ([]byte, error) {
+func (c *IggyTcpClient) sendAndFetchResponse(ctx context.Context, message
[]byte, command command.Code) ([]byte, error) {
+ if ctx == nil {
+ return nil, errors.New("nil context")
Review Comment:
`errors.New("nil context")` is an untyped error - callers can't `errors.Is`
it. every other error path here returns typed values (`context.Canceled`,
`context.DeadlineExceeded`, `ierror.*`). make this a sentinel like
`ierror.ErrNilContext` for consistency at the SDK boundary.
##########
foreign/go/client/tcp/tcp_core.go:
##########
@@ -256,49 +258,102 @@ func (c *IggyTcpClient) write(payload []byte) (int,
error) {
}
// do sends the command to the Iggy server and returns the response.
-func (c *IggyTcpClient) do(cmd command.Command) ([]byte, error) {
+func (c *IggyTcpClient) do(ctx context.Context, cmd command.Command) ([]byte,
error) {
data, err := cmd.MarshalBinary()
if err != nil {
return nil, err
}
- return c.sendAndFetchResponse(data, cmd.Code())
+ return c.sendAndFetchResponse(ctx, data, cmd.Code())
}
-func (c *IggyTcpClient) sendAndFetchResponse(message []byte, command
command.Code) ([]byte, error) {
+func (c *IggyTcpClient) sendAndFetchResponse(ctx context.Context, message
[]byte, command command.Code) ([]byte, error) {
+ if ctx == nil {
+ return nil, errors.New("nil context")
+ }
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
+
c.mtx.Lock()
defer c.mtx.Unlock()
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
+
+ // deadlineMu makes sure that the deadline won't be set to now by the
+ // AfterFunc callback right after we call SetDeadline(time.Time{}) to
+ // clear the deadline after the operation is done.
+ var deadlineMu sync.Mutex
+
+ stop := context.AfterFunc(ctx, func() {
Review Comment:
the `context.AfterFunc` callback reads `c.conn` without holding `c.mtx`, and
`stop()` does not wait for an already-started callback - it can return false
while the callback goroutine is still running. once `sendAndFetchResponse`
returns and the mutex is released, `connect()` can reassign `c.conn` at line
459, so the callback's read races that write. worse, the callback then calls
`SetDeadline(time.Now())` on whatever `c.conn` points to, which can be a fresh
healthy connection, poisoning it into an immediate timeout. fix: snapshot `conn
:= c.conn` under the mutex before registering the callback and have the
callback use the snapshot. a `ctx.Done() == nil` fast-path also closes the
window for `context.Background()` callers.
##########
foreign/go/client/tcp/tcp_core.go:
##########
@@ -256,49 +258,102 @@ func (c *IggyTcpClient) write(payload []byte) (int,
error) {
}
// do sends the command to the Iggy server and returns the response.
-func (c *IggyTcpClient) do(cmd command.Command) ([]byte, error) {
+func (c *IggyTcpClient) do(ctx context.Context, cmd command.Command) ([]byte,
error) {
data, err := cmd.MarshalBinary()
if err != nil {
return nil, err
}
- return c.sendAndFetchResponse(data, cmd.Code())
+ return c.sendAndFetchResponse(ctx, data, cmd.Code())
}
-func (c *IggyTcpClient) sendAndFetchResponse(message []byte, command
command.Code) ([]byte, error) {
+func (c *IggyTcpClient) sendAndFetchResponse(ctx context.Context, message
[]byte, command command.Code) ([]byte, error) {
+ if ctx == nil {
+ return nil, errors.New("nil context")
+ }
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
+
c.mtx.Lock()
defer c.mtx.Unlock()
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
+
+ // deadlineMu makes sure that the deadline won't be set to now by the
+ // AfterFunc callback right after we call SetDeadline(time.Time{}) to
+ // clear the deadline after the operation is done.
+ var deadlineMu sync.Mutex
+
+ stop := context.AfterFunc(ctx, func() {
+ deadlineMu.Lock()
+ defer deadlineMu.Unlock()
+ _ = c.conn.SetDeadline(time.Now())
+ })
+ defer stop()
+
+ clearDeadline := func() {
+ stop()
+ deadlineMu.Lock()
+ defer deadlineMu.Unlock()
+ _ = c.conn.SetDeadline(time.Time{})
+ }
+
+ // ioError returns ctx.Err() when the context caused the I/O failure,
+ // so callers get a consistent context.Canceled /
context.DeadlineExceeded
+ // instead of a low-level net timeout error.
+ ioError := func(err error) error {
+ if ctxErr := ctx.Err(); ctxErr != nil {
+ return ctxErr
+ }
+ return err
+ }
+
payload := createPayload(message, command)
if _, err := c.write(payload); err != nil {
- return nil, err
+ c.invalidateConnLocked()
+ return nil, ioError(err)
}
readBytes, buffer, err := c.read(ResponseInitialBytesLength)
if err != nil {
- return nil, fmt.Errorf("failed to read response for TCP
request: %w", err)
+ c.invalidateConnLocked()
+ return nil, ioError(err)
}
if readBytes != ResponseInitialBytesLength {
+ c.invalidateConnLocked()
return nil, fmt.Errorf("received an invalid or empty response:
%w", ierror.EmptyResponse{})
}
if status := ierror.Code(binary.LittleEndian.Uint32(buffer[0:4]));
status != 0 {
+ clearDeadline()
return nil, ierror.FromCode(status)
}
length := int(binary.LittleEndian.Uint32(buffer[4:]))
if length <= 1 {
+ clearDeadline()
return []byte{}, nil
}
_, buffer, err = c.read(length)
if err != nil {
- return nil, err
+ c.invalidateConnLocked()
+ return nil, ioError(err)
}
+ clearDeadline()
return buffer, nil
}
+// invalidateConnLocked closes the connection and marks it as disconnected
+func (c *IggyTcpClient) invalidateConnLocked() {
Review Comment:
`invalidateConnLocked` closes the conn and sets `StateDisconnected` on every
i/o error, but nothing on the operation path ever reconnects - `connect()` is
only called from `NewIggyTcpClient` and the login-after-redirect path, and the
heartbeat goroutine just logs ping failures. so after the first transient i/o
error the client is permanently dead: every later call writes to the closed
conn and fails forever, and the `reconnection` config (maxRetries/interval)
becomes dead code on this path. pre-PR an i/o error left the conn open and
state unchanged. either wire up a reconnect trigger here or document this as a
hard behavior change.
##########
foreign/go/client/tcp/tcp_core.go:
##########
@@ -256,49 +258,102 @@ func (c *IggyTcpClient) write(payload []byte) (int,
error) {
}
// do sends the command to the Iggy server and returns the response.
-func (c *IggyTcpClient) do(cmd command.Command) ([]byte, error) {
+func (c *IggyTcpClient) do(ctx context.Context, cmd command.Command) ([]byte,
error) {
data, err := cmd.MarshalBinary()
if err != nil {
return nil, err
}
- return c.sendAndFetchResponse(data, cmd.Code())
+ return c.sendAndFetchResponse(ctx, data, cmd.Code())
}
-func (c *IggyTcpClient) sendAndFetchResponse(message []byte, command
command.Code) ([]byte, error) {
+func (c *IggyTcpClient) sendAndFetchResponse(ctx context.Context, message
[]byte, command command.Code) ([]byte, error) {
+ if ctx == nil {
+ return nil, errors.New("nil context")
+ }
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
+
c.mtx.Lock()
defer c.mtx.Unlock()
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
+
+ // deadlineMu makes sure that the deadline won't be set to now by the
+ // AfterFunc callback right after we call SetDeadline(time.Time{}) to
+ // clear the deadline after the operation is done.
+ var deadlineMu sync.Mutex
Review Comment:
`var deadlineMu sync.Mutex` plus the `context.AfterFunc` registration and
the `clearDeadline`/`ioError` closures are allocated on every
`sendAndFetchResponse` call - including the hot `SendMessages`/`PollMessages`
path and `context.Background()` callers whose `ctx.Done()` is nil and can never
fire. a `ctx.Done() == nil` fast-path that skips the deadline machinery
entirely avoids the waste, and as noted on the `AfterFunc` line also removes
the `c.conn` race for non-cancellable contexts.
##########
foreign/go/client/tcp/tcp_session_management.go:
##########
@@ -37,53 +38,54 @@ func (c *IggyTcpClient) LoginUser(username string, password
string) (*iggcon.Ide
}
identity := binaryserialization.DeserializeLogInResponse(buffer)
- shouldRedirect, err := c.HandleLeaderRedirection()
+ shouldRedirect, err := c.HandleLeaderRedirection(ctx)
if err != nil {
return nil, err
}
if shouldRedirect {
if err = c.connect(); err != nil {
return nil, err
}
- return c.LoginUser(username, password)
+ return c.LoginUser(ctx, username, password)
Review Comment:
also at line 71 in `LoginWithPersonalAccessToken`. the recursion re-passes
the same `ctx`, and the first `c.do`'s `AfterFunc` callback can still be live
when `connect()` (line 46 / 68) reassigns `c.conn` - same `c.conn` race flagged
in `tcp_core.go`, reachable single-threaded here. fixing the callback to use a
conn snapshot covers this too.
##########
foreign/go/client/tcp/tcp_core_test.go:
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tcp
+
+import (
+ "context"
+ "encoding/binary"
+ "errors"
+ "net"
+ "strings"
+ "testing"
+ "time"
+
+ iggcon "github.com/apache/iggy/foreign/go/contracts"
+ ierror "github.com/apache/iggy/foreign/go/errors"
+ "github.com/apache/iggy/foreign/go/internal/command"
+)
+
+// newTestClient creates an IggyTcpClient backed by an in-memory net.Pipe
connection.
+// Returns the client and the server-side end of the pipe; caller must close
the server conn.
+func newTestClient(t *testing.T) (*IggyTcpClient, net.Conn) {
+ t.Helper()
+ serverConn, clientConn := net.Pipe()
+ c := &IggyTcpClient{
+ conn: clientConn,
+ state: iggcon.StateConnected,
+ }
+ t.Cleanup(func() {
+ err := clientConn.Close()
+ if err != nil {
+ t.Errorf("error closing client connection: %v", err)
+ }
+ })
+ t.Cleanup(func() {
+ err := serverConn.Close()
+ if err != nil {
+ t.Errorf("error closing server connection: %v", err)
+ }
+ })
+ return c, serverConn
+}
+
+func TestSendAndFetchResponse_NilContext(t *testing.T) {
+ c, _ := newTestClient(t)
+ _, err := c.sendAndFetchResponse(nil, []byte{}, command.Code(0))
//nolint:staticcheck
+ if err == nil {
+ t.Fatal("expected error, got nil")
+ }
+ if !strings.Contains(err.Error(), "nil context") {
+ t.Errorf("got %q, want it to contain %q", err.Error(), "nil
context")
+ }
+}
+
+func TestSendAndFetchResponse_ContextErrors(t *testing.T) {
+ canceledCtx, cancel := context.WithCancel(context.Background())
+ cancel()
+
+ expiredCtx, expiredCancel := context.WithDeadline(context.Background(),
time.Now().Add(-time.Second))
+ defer expiredCancel()
+
+ tests := []struct {
+ name string
+ ctx context.Context
+ wantErr error
+ }{
+ {
+ name: "canceled",
+ ctx: canceledCtx,
+ wantErr: context.Canceled,
+ },
+ {
+ name: "expired",
+ ctx: expiredCtx,
+ wantErr: context.DeadlineExceeded,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c, _ := newTestClient(t)
+
+ // server does not respond, but it doesn't matter.
+ // ctx.Err() should fire before any I/O is attempted.
+ _, err := c.sendAndFetchResponse(tt.ctx, []byte{},
command.Code(0))
+ if err == nil {
+ t.Fatal("expected error, got nil")
+ }
+ if !errors.Is(err, tt.wantErr) {
+ t.Errorf("got %v, want %v", err, tt.wantErr)
+ }
+ })
+ }
+}
+
+func TestSendAndFetchResponse_DeadlineTimeout(t *testing.T) {
+ c, _ := newTestClient(t)
+
+ // server intentionally does not read or write, causing the client to
block
+ // until the context deadline fires and SetDeadline triggers a timeout.
+ ctx, cancel := context.WithTimeout(context.Background(),
100*time.Millisecond)
+ defer cancel()
+
+ _, err := c.sendAndFetchResponse(ctx, []byte{}, command.Code(0))
+ if err == nil {
+ t.Fatal("expected timeout error, got nil")
+ }
+
+ if !errors.Is(err, context.DeadlineExceeded) {
+ t.Errorf("got %v, want context.DeadlineExceeded", err)
+ }
+ // After a timeout, the connection should be invalidated.
+ if c.state != iggcon.StateDisconnected {
Review Comment:
this asserts `c.state == StateDisconnected` after a timeout, which codifies
the sticky-failure behavior (closed conn, no reconnect path) as expected - if
that gets fixed, this test changes. separately, every test here is
single-operation: none exercise `connect()` reassigning `c.conn` while an
`AfterFunc` callback from a prior op is still live, which is exactly the race
this PR introduces. `go test -race` stays green only because of that gap. worth
adding a concurrent-reassignment test.
##########
foreign/go/internal/util/leader_aware.go:
##########
@@ -29,10 +30,10 @@ import (
// CheckAndRedirectToLeader queries the client for cluster metadata and returns
// an address to redirect to (empty string means no redirection needed).
-func CheckAndRedirectToLeader(c iggcon.Client, currentAddress string,
transport iggcon.Protocol) (string, error) {
+func CheckAndRedirectToLeader(ctx context.Context, c iggcon.Client,
currentAddress string, transport iggcon.Protocol) (string, error) {
log.Println("Checking cluster metadata for leader detection")
- meta, err := c.GetClusterMetadata()
+ meta, err := c.GetClusterMetadata(ctx)
Review Comment:
if `ctx` is cancelled during this `GetClusterMetadata` call,
`sendAndFetchResponse` closes the connection via `invalidateConnLocked` and
returns `ctx.Err()` - but the `if err != nil` block just below logs it and
returns `("", nil)`, swallowing the cancellation. `HandleLeaderRedirection`
then returns `(false, nil)` and `LoginUser` returns `identity, nil`: a
successful login on a dead connection. the swallow is intentional for
non-clustered servers, so don't propagate all errors blindly - check
`ctx.Err()` specifically and return it before the swallow.
##########
foreign/go/client/tcp/tcp_core.go:
##########
@@ -256,49 +258,102 @@ func (c *IggyTcpClient) write(payload []byte) (int,
error) {
}
// do sends the command to the Iggy server and returns the response.
-func (c *IggyTcpClient) do(cmd command.Command) ([]byte, error) {
+func (c *IggyTcpClient) do(ctx context.Context, cmd command.Command) ([]byte,
error) {
data, err := cmd.MarshalBinary()
if err != nil {
return nil, err
}
- return c.sendAndFetchResponse(data, cmd.Code())
+ return c.sendAndFetchResponse(ctx, data, cmd.Code())
}
-func (c *IggyTcpClient) sendAndFetchResponse(message []byte, command
command.Code) ([]byte, error) {
+func (c *IggyTcpClient) sendAndFetchResponse(ctx context.Context, message
[]byte, command command.Code) ([]byte, error) {
+ if ctx == nil {
+ return nil, errors.New("nil context")
+ }
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
+
c.mtx.Lock()
defer c.mtx.Unlock()
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
+
+ // deadlineMu makes sure that the deadline won't be set to now by the
Review Comment:
the comment claims `deadlineMu` stops the callback from setting the deadline
to now right after `clearDeadline` clears it, but a mutex only gives mutual
exclusion, not ordering. if `clearDeadline` acquires `deadlineMu` first it sets
the deadline to zero, then the callback runs and sets it to `time.Now()`,
leaving the connection with a past deadline. needs a `cleared` flag set under
`deadlineMu` that the callback checks, not just the mutex.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]