gunli opened a new issue, #11954:
URL: https://github.com/apache/inlong/issues/11954

   ### What happened
   
   In Golang SDK, when we receive a response, we discard the ring buffer of 
gent before we handling the response, which will cause data race.
   
   ```go
   func (c *client) OnTraffic(conn gnet.Conn) (action gnet.Action) {
        // c.log.Debug("response received")
        for {
                total := conn.InboundBuffered()
                if total < heartbeatRspLen {
                        break
                }
   
                buf, _ := conn.Peek(total)
                // if it is a heartbeat response, skip it and read the next 
package
                if bytes.Equal(buf[:heartbeatRspLen], heartbeatRsp) {
                        _, err := conn.Discard(heartbeatRspLen)
                        if err != nil {
                                
c.metrics.incError(errConnReadFailed.getStrCode())
                                c.log.Error("discard connection stream failed, 
err", err)
                                // read failed, close the connection
                                return gnet.Close
                        }
   
                        c.log.Debug("heartbeat rsp receive")
                        continue
                }
   
                length, payloadOffset, payloadOffsetEnd, err := 
c.framer.ReadFrame(buf)
                if errors.Is(err, framer.ErrIncompleteFrame) {
                        break
                }
   
                if err != nil {
                        c.metrics.incError(errConnReadFailed.getStrCode())
                        c.log.Error("invalid packet from stream connection, 
close it, err:", err)
                        // read failed, close the connection
                        return gnet.Close
                }
   
                frame, _ := conn.Peek(length)
                _, err = conn.Discard(length)                                   
                      // discard will release the buffer and it can be written 
again
                if err != nil {
                        c.metrics.incError(errConnReadFailed.getStrCode())
                        c.log.Error("discard connection stream failed, err", 
err)
                        // read failed, close the connection
                        return gnet.Close
                }
   
                // handle response
                c.onResponse(frame[payloadOffset:payloadOffsetEnd])             
  // handle the response here, but the frame may be truncated
        }
        return gnet.None
   }
   ```             
   
   ### What you expected to happen
   
   Discard the ring buffer after handling the response.
   
   ### How to reproduce
   
   It is hard to reproduce
   
   ### Environment
   
   _No response_
   
   ### InLong version
   
   master
   
   ### InLong Component
   
   InLong SDK
   
   ### Are you willing to submit PR?
   
   - [x] Yes, I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@inlong.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to