我也遇到了类似的问题,也是使用了 Bidirectional RPC streaming(类似长连接双工通信)协议,有多个客户端和一个服务端,每个客户端新建连接到服务端,发送一次控制命令(Send-Recv)后就关闭连接。
可以查看这部分 grpc-go 的源代码 <https://github.com/grpc/grpc-go/blob/4f21cde702d9f9b1c874791e1c3751b1f7d192ce/server.go#L779>,server 会为每个新的客户端连接初始化一个 stream rpc 对象,在 handler 函数中成对地进行 Recv 和 Send 就可以保证同一个连接中收发消息匹配。 楼主提到的“An error ocurred: rpc error: code = Canceled desc = context canceled.”错误,推测应该是因为在多个 goroutine 中,多个 stream rpc 对象的 Send 和 Recv 不匹配,比如 A 的消息回复给了 B stream rpc 对象(B 的客户端收到回复关闭连接),而 B 又调研 Recv,就报错了这个错误。 在2017年11月6日星期一 UTC+8 下午2:36:36<xdrag...@gmail.com> 写道: > Hi, > > I wonder if you have resolve this problem? Because I got a similar one. > > > On Monday, May 22, 2017 at 6:13:01 PM UTC+8, Manca Bizjak wrote: >> >> Hi, >> >> Thank you for your reply. I am working on a minimal example to reproduce >> this. In the meanwhile, I will try to provide some further clarifications >> below. >> >> Is each client an independent ClientConn, or one stream from the same >>> ClientConn (I assume independent ClientConn because you said it maintains >>> its own connection). >>> >> >> Every client maintains its own ClientConn and a corresponding stream. >> Here's my Client struct (grpc-related fields are in bold). This Client >> struct implements the grpc client stub. Several of such clients are >> instantiated concurrently, each starting its own RPC. >> >> type Client struct { >> id int32 >> *conn *grpc.ClientConn* >> *protocolClient *pb.ProtocolClient* >> *stream* *pb.Protocol_RunClient* // this is obtained via >> protocolClient.Run(context.Background()) >> schema pb.SchemaType >> variant pb.SchemaVariant >> handler *ClientHandler >> } >> >> Can you provide more details about what's done in the RPC service >>> handler? How do you deal with the stream when RPC finishes >>> >> >> I have found that doing "stream.CloseSend" on one Client closes the >> stream for all other clients as well (other clients that have not yet >> received responses from the server get the EOF error during stream.Recv(). >> This is why I'm currently not explicitly closing the stream neither from >> client side nor from server side. As for the server-side logic, for now it >> is just supposed to 1.) recieve an initialization message from the client, >> 2.) read some data from this message, 3.) send a response to the client. I >> am pasting a part of my implementation (relevant parts are again in bold). >> >> type Server struct { >>> handler ServerHandler >>> *stream pb.Protocol_RunServer* >>> } >>> func (s *Server) Run(stream pb.Protocol_RunServer) error { >>> *s.stream = stream* >>> log.Printf("[Server] New Client connected") >>> *for {* >>> *req, err := s.recieve()* >>> if err != nil { >>> log.Println("Got error when trying to receive from stream") >>> } >>> switch req.Content.(type) { >>> case *pb.Message_Empty: >>> log.Println("Got empty message, indicating start of a protocol") >>> reqSchemaType := req.GetSchema() >>> reqSchemaVariant := req.GetSchemaVariant() >>> reqSchemaTypeStr := pb.SchemaType_name[int32(reqSchemaType)] >>> reqSchemaVariantStr := pb.SchemaVariant_name[int32(reqSchemaVariant)] >>> reqClientId := req.GetClientId() >>> log.Println("Client [", reqClientId, "] requested", reqSchemaTypeStr, >>> "variant", reqSchemaVariantStr) >>> switch reqSchemaType { >>> case pb.SchemaType_PEDERSEN_EC: >>> >>> s.handler.pedersenECReciever = commitments.NewPedersenECReceiver() >>> h := s.handler.pedersenECReciever.GetH() >>> ecge := pb.ECGroupElement{X: h.X.Bytes(), Y: h.Y.Bytes()} >>> resp := &pb.Message{Content: &pb.Message_EcGroupElement{&ecge}} >>> *err := s.send(resp)* >>> if err != nil { >>> return err >>> } >>> log.Printf("[Server] Sent response to initial message") >>> >>> default: >>> log.Println("The requested protocol is currently unsupported.") >>> } >>> default: >>> log.Printf("[Server] Received an intermediate request", req) >>> } >>> *}* >>> return nil >>> } >> >> >> In the meanwhile, I did some further debugging and I have found that for >> each concurrent client a new RPC is created (so far so good). I mentioned >> that every "rpc transaction" between any of my clients and the server >> involves 3 consecutive request-response pairs. To simplify, I am now >> testing with only the first request-response pair (as the above server-side >> code illustrates) and here's what I found: lets say I send a request from 2 >> clients. As a result, 2 RPCs are started on the server. The server gets the >> request from each client in its own RPC. However, the server then sends >> both responses to a single RPC - the RPC of the first client. So the first >> client always succeeds, but the second client "hangs" waiting for a >> response from server, which never arrives, since both responses from the >> server are associated with the first RPC only. I'm attaching an image that >> demonstrates this. >> >> I was wondering if there is any way I can associate a response with a >> certain RPC? >> >> If you could come up with a small example to reproduce this, it would >>> also be very helpful. >>> >> >> Sure, I will provide such an example shortly. >> >> Thanks! >> Best regards >> >> >> -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/473d2022-e10b-4611-8715-0cbda8161278n%40googlegroups.com.