For anyone visiting this page in future: I had a similar problem and it turned out to be the high buffer size(a bug) used on the stream, this lead to high I/O usage (we write the chunks to disk that are received) and concurrent execution caused processes to hang and terminate with "rpc error: code = Canceled desc = context canceled"
On Friday, 22 October 2021 at 08:13:54 UTC+5:30 aloisio wrote: > 我也遇到了类似的问题,也是使用了 Bidirectional RPC > streaming(类似长连接双工通信)协议,有多个客户端和一个服务端,每个客户端新建连接到服务端,发送一次控制命令(Send-Recv)后就关闭连接。 > > 可以查看这部分 grpc-go 的源代码 > <https://github.com/grpc/grpc-go/blob/4f21cde702d9f9b1c874791e1c3751b1f7d192ce/server.go#L779>,server > > 会为每个新的客户端连接初始化一个 stream rpc 对象,在 handler 函数中成对地进行 Recv 和 Send > 就可以保证同一个连接中收发消息匹配。 > > > 楼主提到的“An error ocurred: rpc error: code = Canceled desc = context > canceled.”错误,推测应该是因为在多个 goroutine 中,多个 stream rpc 对象的 Send 和 Recv 不匹配,比如 A > 的消息回复给了 B stream rpc 对象(B 的客户端收到回复关闭连接),而 B 又调研 Recv,就报错了这个错误。 > > 在2017年11月6日星期一 UTC+8 下午2:36:36<xdrag...@gmail.com> 写道: > >> Hi, >> >> I wonder if you have resolve this problem? Because I got a similar one. >> >> >> On Monday, May 22, 2017 at 6:13:01 PM UTC+8, Manca Bizjak wrote: >>> >>> Hi, >>> >>> Thank you for your reply. I am working on a minimal example to reproduce >>> this. In the meanwhile, I will try to provide some further clarifications >>> below. >>> >>> Is each client an independent ClientConn, or one stream from the same >>>> ClientConn (I assume independent ClientConn because you said it maintains >>>> its own connection). >>>> >>> >>> Every client maintains its own ClientConn and a corresponding stream. >>> Here's my Client struct (grpc-related fields are in bold). This Client >>> struct implements the grpc client stub. Several of such clients are >>> instantiated concurrently, each starting its own RPC. >>> >>> type Client struct { >>> id int32 >>> *conn *grpc.ClientConn* >>> *protocolClient *pb.ProtocolClient* >>> *stream* *pb.Protocol_RunClient* // this is obtained via >>> protocolClient.Run(context.Background()) >>> schema pb.SchemaType >>> variant pb.SchemaVariant >>> handler *ClientHandler >>> } >>> >>> Can you provide more details about what's done in the RPC service >>>> handler? How do you deal with the stream when RPC finishes >>>> >>> >>> I have found that doing "stream.CloseSend" on one Client closes the >>> stream for all other clients as well (other clients that have not yet >>> received responses from the server get the EOF error during stream.Recv(). >>> This is why I'm currently not explicitly closing the stream neither from >>> client side nor from server side. As for the server-side logic, for now it >>> is just supposed to 1.) recieve an initialization message from the client, >>> 2.) read some data from this message, 3.) send a response to the client. I >>> am pasting a part of my implementation (relevant parts are again in bold). >>> >>> type Server struct { >>>> handler ServerHandler >>>> *stream pb.Protocol_RunServer* >>>> } >>>> func (s *Server) Run(stream pb.Protocol_RunServer) error { >>>> *s.stream = stream* >>>> log.Printf("[Server] New Client connected") >>>> *for {* >>>> *req, err := s.recieve()* >>>> if err != nil { >>>> log.Println("Got error when trying to receive from stream") >>>> } >>>> switch req.Content.(type) { >>>> case *pb.Message_Empty: >>>> log.Println("Got empty message, indicating start of a protocol") >>>> reqSchemaType := req.GetSchema() >>>> reqSchemaVariant := req.GetSchemaVariant() >>>> reqSchemaTypeStr := pb.SchemaType_name[int32(reqSchemaType)] >>>> reqSchemaVariantStr := pb.SchemaVariant_name[int32(reqSchemaVariant)] >>>> reqClientId := req.GetClientId() >>>> log.Println("Client [", reqClientId, "] requested", reqSchemaTypeStr, >>>> "variant", reqSchemaVariantStr) >>>> switch reqSchemaType { >>>> case pb.SchemaType_PEDERSEN_EC: >>>> >>>> s.handler.pedersenECReciever = commitments.NewPedersenECReceiver() >>>> h := s.handler.pedersenECReciever.GetH() >>>> ecge := pb.ECGroupElement{X: h.X.Bytes(), Y: h.Y.Bytes()} >>>> resp := &pb.Message{Content: &pb.Message_EcGroupElement{&ecge}} >>>> *err := s.send(resp)* >>>> if err != nil { >>>> return err >>>> } >>>> log.Printf("[Server] Sent response to initial message") >>>> >>>> default: >>>> log.Println("The requested protocol is currently unsupported.") >>>> } >>>> default: >>>> log.Printf("[Server] Received an intermediate request", req) >>>> } >>>> *}* >>>> return nil >>>> } >>> >>> >>> In the meanwhile, I did some further debugging and I have found that for >>> each concurrent client a new RPC is created (so far so good). I mentioned >>> that every "rpc transaction" between any of my clients and the server >>> involves 3 consecutive request-response pairs. To simplify, I am now >>> testing with only the first request-response pair (as the above server-side >>> code illustrates) and here's what I found: lets say I send a request from 2 >>> clients. As a result, 2 RPCs are started on the server. The server gets the >>> request from each client in its own RPC. However, the server then sends >>> both responses to a single RPC - the RPC of the first client. So the first >>> client always succeeds, but the second client "hangs" waiting for a >>> response from server, which never arrives, since both responses from the >>> server are associated with the first RPC only. I'm attaching an image that >>> demonstrates this. >>> >>> I was wondering if there is any way I can associate a response with a >>> certain RPC? >>> >>> If you could come up with a small example to reproduce this, it would >>>> also be very helpful. >>>> >>> >>> Sure, I will provide such an example shortly. >>> >>> Thanks! >>> Best regards >>> >>> >>> -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/0d12f768-c4f0-41a3-925b-4b360d124b98n%40googlegroups.com.