chengxilo commented on code in PR #2964:
URL: https://github.com/apache/iggy/pull/2964#discussion_r3242821757
##########
foreign/go/client/tcp/tcp_core.go:
##########
@@ -256,49 +258,102 @@ func (c *IggyTcpClient) write(payload []byte) (int,
error) {
}
// do sends the command to the Iggy server and returns the response.
-func (c *IggyTcpClient) do(cmd command.Command) ([]byte, error) {
+func (c *IggyTcpClient) do(ctx context.Context, cmd command.Command) ([]byte,
error) {
data, err := cmd.MarshalBinary()
if err != nil {
return nil, err
}
- return c.sendAndFetchResponse(data, cmd.Code())
+ return c.sendAndFetchResponse(ctx, data, cmd.Code())
}
-func (c *IggyTcpClient) sendAndFetchResponse(message []byte, command
command.Code) ([]byte, error) {
+func (c *IggyTcpClient) sendAndFetchResponse(ctx context.Context, message
[]byte, command command.Code) ([]byte, error) {
+ if ctx == nil {
+ return nil, errors.New("nil context")
+ }
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
+
c.mtx.Lock()
defer c.mtx.Unlock()
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
+
+ // deadlineMu makes sure that the deadline won't be set to now by the
+ // AfterFunc callback right after we call SetDeadline(time.Time{}) to
+ // clear the deadline after the operation is done.
+ var deadlineMu sync.Mutex
+
+ stop := context.AfterFunc(ctx, func() {
+ deadlineMu.Lock()
+ defer deadlineMu.Unlock()
+ _ = c.conn.SetDeadline(time.Now())
+ })
+ defer stop()
+
+ clearDeadline := func() {
+ stop()
+ deadlineMu.Lock()
+ defer deadlineMu.Unlock()
+ _ = c.conn.SetDeadline(time.Time{})
+ }
+
+ // ioError returns ctx.Err() when the context caused the I/O failure,
+ // so callers get a consistent context.Canceled /
context.DeadlineExceeded
+ // instead of a low-level net timeout error.
+ ioError := func(err error) error {
+ if ctxErr := ctx.Err(); ctxErr != nil {
+ return ctxErr
+ }
+ return err
+ }
+
payload := createPayload(message, command)
if _, err := c.write(payload); err != nil {
- return nil, err
+ c.invalidateConnLocked()
+ return nil, ioError(err)
}
readBytes, buffer, err := c.read(ResponseInitialBytesLength)
if err != nil {
- return nil, fmt.Errorf("failed to read response for TCP
request: %w", err)
+ c.invalidateConnLocked()
+ return nil, ioError(err)
}
if readBytes != ResponseInitialBytesLength {
+ c.invalidateConnLocked()
return nil, fmt.Errorf("received an invalid or empty response:
%w", ierror.EmptyResponse{})
}
if status := ierror.Code(binary.LittleEndian.Uint32(buffer[0:4]));
status != 0 {
+ clearDeadline()
return nil, ierror.FromCode(status)
}
length := int(binary.LittleEndian.Uint32(buffer[4:]))
if length <= 1 {
+ clearDeadline()
return []byte{}, nil
}
_, buffer, err = c.read(length)
if err != nil {
- return nil, err
+ c.invalidateConnLocked()
+ return nil, ioError(err)
}
+ clearDeadline()
return buffer, nil
}
+// invalidateConnLocked closes the connection and marks it as disconnected
+func (c *IggyTcpClient) invalidateConnLocked() {
Review Comment:
Yeah I almost forgot about this . I was planning to solve this in another
PR. Maybe I can create a PR to implement the reconnect logic first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]