我也遇到了类似的问题,也是使用了 Bidirectional RPC 
streaming(类似长连接双工通信)协议,有多个客户端和一个服务端,每个客户端新建连接到服务端,发送一次控制命令(Send-Recv)后就关闭连接。

可以查看这部分 grpc-go 的源代码 
<https://github.com/grpc/grpc-go/blob/4f21cde702d9f9b1c874791e1c3751b1f7d192ce/server.go#L779>,server
 
会为每个新的客户端连接初始化一个 stream rpc 对象,在 handler 函数中成对地进行 Recv 和 Send 
就可以保证同一个连接中收发消息匹配。


楼主提到的“An error ocurred: rpc error: code = Canceled desc = context 
canceled.”错误,推测应该是因为在多个 goroutine 中,多个 stream rpc 对象的 Send 和 Recv 不匹配,比如 A 
的消息回复给了 B stream rpc 对象(B 的客户端收到回复关闭连接),而 B 又调研 Recv,就报错了这个错误。

在2017年11月6日星期一 UTC+8 下午2:36:36<xdrag...@gmail.com> 写道:

> Hi,
>
> I wonder if you have resolve this problem? Because I got a similar one.
>
>
> On Monday, May 22, 2017 at 6:13:01 PM UTC+8, Manca Bizjak wrote:
>>
>> Hi,
>>
>> Thank you for your reply. I am working on a minimal example to reproduce 
>> this. In the meanwhile, I will try to provide some further clarifications 
>> below.
>>
>> Is each client an independent ClientConn, or one stream from the same 
>>> ClientConn (I assume independent ClientConn because you said it maintains 
>>> its own connection).
>>>
>>
>> Every client maintains its own ClientConn and a corresponding stream. 
>> Here's my Client struct (grpc-related fields are in bold). This Client 
>> struct implements the grpc client stub. Several of such clients are 
>> instantiated concurrently, each starting its own RPC. 
>>
>> type Client struct {
>> id       int32
>> *conn     *grpc.ClientConn*
>> *protocolClient  *pb.ProtocolClient*
>> *stream*   *pb.Protocol_RunClient* // this is obtained via 
>> protocolClient.Run(context.Background())
>> schema   pb.SchemaType
>> variant  pb.SchemaVariant
>> handler  *ClientHandler
>> } 
>>
>> Can you provide more details about what's done in the RPC service 
>>> handler? How do you deal with the stream when RPC finishes
>>>
>>  
>> I have found that doing "stream.CloseSend" on one Client closes the 
>> stream for all other clients as well (other clients that have not yet 
>> received responses from the server get the EOF error during stream.Recv(). 
>> This is why I'm currently not explicitly closing the stream neither from 
>> client side nor from server side. As for the server-side logic, for now it 
>> is just supposed to 1.) recieve an initialization message from the client, 
>> 2.) read some data from this message, 3.) send a response to the client. I 
>> am pasting a part of my implementation (relevant parts are again in bold).
>>
>> type Server struct {
>>> handler ServerHandler
>>> *stream  pb.Protocol_RunServer*
>>> }
>>> func (s *Server) Run(stream pb.Protocol_RunServer) error {
>>> *s.stream = stream*
>>> log.Printf("[Server] New Client connected")
>>> *for {*
>>> *req, err := s.recieve()*
>>> if err != nil {
>>> log.Println("Got error when trying to receive from stream")
>>> }
>>> switch req.Content.(type) {
>>> case *pb.Message_Empty:
>>> log.Println("Got empty message, indicating start of a protocol") 
>>> reqSchemaType := req.GetSchema()
>>> reqSchemaVariant := req.GetSchemaVariant()
>>> reqSchemaTypeStr := pb.SchemaType_name[int32(reqSchemaType)]
>>> reqSchemaVariantStr := pb.SchemaVariant_name[int32(reqSchemaVariant)]
>>> reqClientId := req.GetClientId()
>>> log.Println("Client [", reqClientId, "] requested", reqSchemaTypeStr, 
>>> "variant", reqSchemaVariantStr)
>>> switch reqSchemaType {
>>> case pb.SchemaType_PEDERSEN_EC:
>>>
>>> s.handler.pedersenECReciever = commitments.NewPedersenECReceiver()
>>> h := s.handler.pedersenECReciever.GetH()
>>> ecge := pb.ECGroupElement{X: h.X.Bytes(), Y: h.Y.Bytes()}
>>> resp := &pb.Message{Content: &pb.Message_EcGroupElement{&ecge}}
>>> *err := s.send(resp)*
>>> if err != nil {
>>> return err
>>> }
>>> log.Printf("[Server] Sent response to initial message")
>>>
>>> default:
>>> log.Println("The requested protocol is currently unsupported.")
>>> }
>>> default:
>>> log.Printf("[Server] Received an intermediate request", req)
>>> }
>>> *}*
>>> return nil
>>> }
>>
>>
>> In the meanwhile, I did some further debugging and I have found that for 
>> each concurrent client a new RPC is created (so far so good). I mentioned 
>> that every "rpc transaction" between any of my clients and the server 
>> involves 3 consecutive request-response pairs. To simplify, I am now 
>> testing with only the first request-response pair (as the above server-side 
>> code illustrates) and here's what I found: lets say I send a request from 2 
>> clients. As a result, 2 RPCs are started on the server. The server gets the 
>> request from each client in its own RPC. However, the server then sends 
>> both responses to a single RPC - the RPC of the first client. So the first 
>> client always succeeds, but the second client "hangs" waiting for a 
>> response from server, which never arrives, since both responses from the 
>> server are associated with the first RPC only. I'm attaching an image that 
>> demonstrates this.
>>
>> I was wondering if there is any way I can associate a response with a 
>> certain RPC? 
>>
>> If you could come up with a small example to reproduce this, it would 
>>> also be very helpful.
>>>
>>  
>> Sure, I will provide such an example shortly. 
>>
>> Thanks!
>> Best regards
>>
>>
>>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to grpc-io+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/473d2022-e10b-4611-8715-0cbda8161278n%40googlegroups.com.

Reply via email to