I meant in a bi-di stream, messages in one direction will be received by the order they were sent. For example, the requests a client sent to a server will always be received in the order they were sent. Vice versa. However, there's no guarantee of order among the requests and responses. For example, if the client sends 3 requests, the server sends 3 responses, there's no guarantee on the TCP or GRPC level that the client will receive the 1st response before it sends the 2nd request. If such guarantee is needed, it has to be implemented by the application.
On Mon, Oct 5, 2020 at 3:34 PM yoges nsamy <[email protected]> wrote: > >> So you don't have to pause inbetween requests, just pause at the end of > sending before calling CloseSend. > > Thank you, this solved my issue. > > >> Also, messages in a single direction are received in order. This is > guaranteed by TCP. > > My grpc server has been defined as bi-directional. > Does your statement mean I can still send messages in a single direction > even if it's defined as bi-di; I assumed it has to be a stream. > Appreciate some guidance. > > > On Thursday, 17 September 2020 at 21:22:40 UTC+8 [email protected] wrote: > >> I think the problem in this code is because of the sender ending too >> soon. When the sender calls CloseSend, the server will end the other side >> of the stream as well which causes the receiver to get an EOF and fails to >> receive the desired response. So you don't have to pause inbetween >> requests, just pause at the end of sending before calling CloseSend. >> >> Also, messages in a single direction are received in order. This is >> guaranteed by TCP. >> >> On Thu, Sep 17, 2020 at 4:52 AM 'Easwar Swaminathan' via grpc.io < >> [email protected]> wrote: >> >>> A bidirectional streaming gRPC call is one where the two streams >>> (sending and receiving) are totally independent of each other. Messages >>> sent by one end will be received by the other end in the order in which it >>> was sent. But there can be no guarantees on the order of messages across >>> the streams. If you want to impose ordering between the sending and >>> receiving, it has to be done at the application layer. For example, instead >>> of sending all five messages from one goroutine and reading all responses >>> in another, you could use a single goroutine to send and receive messages >>> in order. >>> >>> On Thursday, August 27, 2020 at 10:58:05 AM UTC-7 >>> [email protected] wrote: >>> >>>> >>>> I'm calling a bi-directional streaming service with five messages which >>>> needs to be sent in sequence. >>>> For example pbx.ClientMsg_Sub (Id=3) should be sent and *completed* >>>> before pbx.ClientMsg_Set(Id=4). >>>> >>>> I notice that if I don't delay the sending of each message, I don't get >>>> the expected response. >>>> for _, req := range requests { >>>> fmt.Printf("Sending message: %v\n", req) >>>> stream.Send(req) >>>> time.Sleep(500 * time.Millisecond) // doesn't run in >>>> sequence if removed >>>> } >>>> >>>> My full code is shared below. >>>> >>>> func getUserByCUID(w http.ResponseWriter, req *http.Request) { >>>> enableCors(&w) >>>> >>>> crtFile := "/home/yogesnsamy/Coding/MHub/prod_cert/cert.crt" >>>> creds, err := credentials.NewClientTLSFromFile(crtFile, "") >>>> if err != nil { >>>> log.Fatal("Error loading cert", err) >>>> } >>>> >>>> conn, err := grpc.Dial("dev.mhub.my:16060", grpc. >>>> WithTransportCredentials(creds)) >>>> if err != nil { >>>> log.Fatal("Error dialing", err) >>>> } >>>> >>>> c := pbx.NewNodeClient(conn) >>>> // we create a stream by invoking the client >>>> stream, err := c.MessageLoop(context.Background()) >>>> if err != nil { >>>> log.Fatalf("Error while creating stream: %v", err) >>>> return >>>> } >>>> >>>> // get user's cognito user id >>>> cuID, ok := req.URL.Query()["cid"] >>>> >>>> if !ok || len(cuID[0]) < 1 { >>>> log.Println("Url Param 'cid' is missing") >>>> return >>>> } >>>> tag := fmt.Sprintf(`"%v"`, cuID[0]) // mhub >>>> resp := Response{"n/a"} >>>> >>>> requests := []*pbx.ClientMsg{ >>>> &pbx.ClientMsg{ >>>> Message: &pbx.ClientMsg_Hi{ >>>> Hi: &pbx.ClientHi{ >>>> Id: "1", >>>> UserAgent: "Golang_Spider_Bot/3.0", >>>> Ver: "0.15", >>>> Lang: "EN", >>>> }}, >>>> }, >>>> &pbx.ClientMsg{ >>>> Message: &pbx.ClientMsg_Login{ >>>> Login: &pbx.ClientLogin{ >>>> Id: "2", >>>> Scheme: "basic", >>>> Secret: []byte("carol:carol123"), >>>> }}, >>>> }, >>>> &pbx.ClientMsg{ >>>> Message: &pbx.ClientMsg_Sub{ >>>> Sub: &pbx.ClientSub{ >>>> Id: "3", >>>> Topic: "fnd", >>>> GetQuery: &pbx.GetQuery{ >>>> What: "sub", >>>> }, >>>> }, >>>> }, >>>> }, >>>> &pbx.ClientMsg{ >>>> Message: &pbx.ClientMsg_Set{ >>>> Set: &pbx.ClientSet{ >>>> Id: "4", >>>> Topic: "fnd", >>>> Query: &pbx.SetQuery{ >>>> Desc: &pbx.SetDesc{ >>>> Public: []byte(tag), >>>> }, >>>> }, >>>> }, >>>> }, >>>> }, >>>> &pbx.ClientMsg{ >>>> Message: &pbx.ClientMsg_Get{ >>>> Get: &pbx.ClientGet{ >>>> Id: "5", >>>> Topic: "fnd", >>>> Query: &pbx.GetQuery{ >>>> What: "sub", >>>> }, >>>> }}, >>>> }, >>>> } >>>> >>>> waitc := make(chan struct{}) >>>> // we send a bunch of messages to the client (go routine) >>>> go func() { >>>> // function to send a bunch of messages >>>> for _, req := range requests { >>>> fmt.Printf("Sending message: %v\n", req) >>>> stream.Send(req) >>>> time.Sleep(500 * time.Millisecond) // doesn't run in >>>> sequence if removed >>>> } >>>> stream.CloseSend() >>>> }() >>>> // we receive a bunch of messages from the client (go routine) >>>> // count := 0 >>>> go func() { >>>> // function to receive a bunch of messages >>>> uid := "n/a" >>>> for { >>>> res, err := stream.Recv() >>>> if err == io.EOF { >>>> resp.TinodeUserID = uid >>>> js, err := json.Marshal(resp) >>>> if err != nil { >>>> http.Error(w, err.Error(), http. >>>> StatusInternalServerError) >>>> return >>>> } >>>> >>>> w.Header().Set("Content-Type", "application/json") >>>> w.Write(js) >>>> break >>>> } >>>> if err != nil { >>>> log.Fatalf("Error while receiving: %v", err) >>>> break >>>> } >>>> if res.GetMeta() != nil { >>>> if res.GetMeta().Id == "5" { >>>> sub := res.GetMeta().GetSub() >>>> for _, elem := range sub { >>>> uid = elem.UserId >>>> } >>>> } >>>> } >>>> } >>>> close(waitc) >>>> }() >>>> >>>> // block until everything is done >>>> <-waitc >>>> } >>>> >>>> >>>> >>>> Response when operation is paused before sending the next message: >>>> {"tinodeuserid":"usrNlWi3eOr74w"} // getting the expected result >>>> >>>> Response when operation is not paused before sending the next message: >>>> {"tinodeuserid":"n/a"} >>>> >>>> >>>> What is the right way to make sure the messages are sent in sequence? >>>> My objective is to send message n+1 after message n completes >>>> (something like using await in javascript). >>>> >>>> Thank you. >>>> >>>> >>>> -- >>> You received this message because you are subscribed to the Google >>> Groups "grpc.io" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to [email protected]. >>> To view this discussion on the web visit >>> https://groups.google.com/d/msgid/grpc-io/d9d377dc-63aa-4b99-8d79-cf49c15cb75dn%40googlegroups.com >>> <https://groups.google.com/d/msgid/grpc-io/d9d377dc-63aa-4b99-8d79-cf49c15cb75dn%40googlegroups.com?utm_medium=email&utm_source=footer> >>> . >>> >> -- > You received this message because you are subscribed to the Google Groups " > grpc.io" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To view this discussion on the web visit > https://groups.google.com/d/msgid/grpc-io/6f52bfce-074a-4560-b7d1-0337bc8b07edn%40googlegroups.com > <https://groups.google.com/d/msgid/grpc-io/6f52bfce-074a-4560-b7d1-0337bc8b07edn%40googlegroups.com?utm_medium=email&utm_source=footer> > . > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/CAABt8Y8nJ8LSdGpbfKH%3DKNjBQTbgqsPtvQr4OgD5CopQ2azhxw%40mail.gmail.com.
