A bidirectional streaming gRPC call is one where the two streams (sending and receiving) are totally independent of each other. Messages sent by one end will be received by the other end in the order in which it was sent. But there can be no guarantees on the order of messages across the streams. If you want to impose ordering between the sending and receiving, it has to be done at the application layer. For example, instead of sending all five messages from one goroutine and reading all responses in another, you could use a single goroutine to send and receive messages in order.
On Thursday, August 27, 2020 at 10:58:05 AM UTC-7 [email protected] wrote: > > I'm calling a bi-directional streaming service with five messages which > needs to be sent in sequence. > For example pbx.ClientMsg_Sub (Id=3) should be sent and *completed* > before pbx.ClientMsg_Set(Id=4). > > I notice that if I don't delay the sending of each message, I don't get > the expected response. > for _, req := range requests { > fmt.Printf("Sending message: %v\n", req) > stream.Send(req) > time.Sleep(500 * time.Millisecond) // doesn't run in sequence > if removed > } > > My full code is shared below. > > func getUserByCUID(w http.ResponseWriter, req *http.Request) { > enableCors(&w) > > crtFile := "/home/yogesnsamy/Coding/MHub/prod_cert/cert.crt" > creds, err := credentials.NewClientTLSFromFile(crtFile, "") > if err != nil { > log.Fatal("Error loading cert", err) > } > > conn, err := grpc.Dial("dev.mhub.my:16060", grpc. > WithTransportCredentials(creds)) > if err != nil { > log.Fatal("Error dialing", err) > } > > c := pbx.NewNodeClient(conn) > // we create a stream by invoking the client > stream, err := c.MessageLoop(context.Background()) > if err != nil { > log.Fatalf("Error while creating stream: %v", err) > return > } > > // get user's cognito user id > cuID, ok := req.URL.Query()["cid"] > > if !ok || len(cuID[0]) < 1 { > log.Println("Url Param 'cid' is missing") > return > } > tag := fmt.Sprintf(`"%v"`, cuID[0]) // mhub > resp := Response{"n/a"} > > requests := []*pbx.ClientMsg{ > &pbx.ClientMsg{ > Message: &pbx.ClientMsg_Hi{ > Hi: &pbx.ClientHi{ > Id: "1", > UserAgent: "Golang_Spider_Bot/3.0", > Ver: "0.15", > Lang: "EN", > }}, > }, > &pbx.ClientMsg{ > Message: &pbx.ClientMsg_Login{ > Login: &pbx.ClientLogin{ > Id: "2", > Scheme: "basic", > Secret: []byte("carol:carol123"), > }}, > }, > &pbx.ClientMsg{ > Message: &pbx.ClientMsg_Sub{ > Sub: &pbx.ClientSub{ > Id: "3", > Topic: "fnd", > GetQuery: &pbx.GetQuery{ > What: "sub", > }, > }, > }, > }, > &pbx.ClientMsg{ > Message: &pbx.ClientMsg_Set{ > Set: &pbx.ClientSet{ > Id: "4", > Topic: "fnd", > Query: &pbx.SetQuery{ > Desc: &pbx.SetDesc{ > Public: []byte(tag), > }, > }, > }, > }, > }, > &pbx.ClientMsg{ > Message: &pbx.ClientMsg_Get{ > Get: &pbx.ClientGet{ > Id: "5", > Topic: "fnd", > Query: &pbx.GetQuery{ > What: "sub", > }, > }}, > }, > } > > waitc := make(chan struct{}) > // we send a bunch of messages to the client (go routine) > go func() { > // function to send a bunch of messages > for _, req := range requests { > fmt.Printf("Sending message: %v\n", req) > stream.Send(req) > time.Sleep(500 * time.Millisecond) // doesn't run in sequence > if removed > } > stream.CloseSend() > }() > // we receive a bunch of messages from the client (go routine) > // count := 0 > go func() { > // function to receive a bunch of messages > uid := "n/a" > for { > res, err := stream.Recv() > if err == io.EOF { > resp.TinodeUserID = uid > js, err := json.Marshal(resp) > if err != nil { > http.Error(w, err.Error(), http. > StatusInternalServerError) > return > } > > w.Header().Set("Content-Type", "application/json") > w.Write(js) > break > } > if err != nil { > log.Fatalf("Error while receiving: %v", err) > break > } > if res.GetMeta() != nil { > if res.GetMeta().Id == "5" { > sub := res.GetMeta().GetSub() > for _, elem := range sub { > uid = elem.UserId > } > } > } > } > close(waitc) > }() > > // block until everything is done > <-waitc > } > > > > Response when operation is paused before sending the next message: > {"tinodeuserid":"usrNlWi3eOr74w"} // getting the expected result > > Response when operation is not paused before sending the next message: > {"tinodeuserid":"n/a"} > > > What is the right way to make sure the messages are sent in sequence? > My objective is to send message n+1 after message n completes (something > like using await in javascript). > > Thank you. > > > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/d9d377dc-63aa-4b99-8d79-cf49c15cb75dn%40googlegroups.com.
