>> So you don't have to pause inbetween requests, just pause at the end of sending before calling CloseSend.
Thank you, this solved my issue. >> Also, messages in a single direction are received in order. This is guaranteed by TCP. My grpc server has been defined as bi-directional. Does your statement mean I can still send messages in a single direction even if it's defined as bi-di; I assumed it has to be a stream. Appreciate some guidance. On Thursday, 17 September 2020 at 21:22:40 UTC+8 [email protected] wrote: > I think the problem in this code is because of the sender ending too soon. > When the sender calls CloseSend, the server will end the other side of the > stream as well which causes the receiver to get an EOF and fails to > receive the desired response. So you don't have to pause inbetween > requests, just pause at the end of sending before calling CloseSend. > > Also, messages in a single direction are received in order. This is > guaranteed by TCP. > > On Thu, Sep 17, 2020 at 4:52 AM 'Easwar Swaminathan' via grpc.io < > [email protected]> wrote: > >> A bidirectional streaming gRPC call is one where the two streams (sending >> and receiving) are totally independent of each other. Messages sent by one >> end will be received by the other end in the order in which it was sent. >> But there can be no guarantees on the order of messages across the streams. >> If you want to impose ordering between the sending and receiving, it has to >> be done at the application layer. For example, instead of sending all five >> messages from one goroutine and reading all responses in another, you could >> use a single goroutine to send and receive messages in order. >> >> On Thursday, August 27, 2020 at 10:58:05 AM UTC-7 [email protected] >> wrote: >> >>> >>> I'm calling a bi-directional streaming service with five messages which >>> needs to be sent in sequence. >>> For example pbx.ClientMsg_Sub (Id=3) should be sent and *completed* >>> before pbx.ClientMsg_Set(Id=4). >>> >>> I notice that if I don't delay the sending of each message, I don't get >>> the expected response. >>> for _, req := range requests { >>> fmt.Printf("Sending message: %v\n", req) >>> stream.Send(req) >>> time.Sleep(500 * time.Millisecond) // doesn't run in >>> sequence if removed >>> } >>> >>> My full code is shared below. >>> >>> func getUserByCUID(w http.ResponseWriter, req *http.Request) { >>> enableCors(&w) >>> >>> crtFile := "/home/yogesnsamy/Coding/MHub/prod_cert/cert.crt" >>> creds, err := credentials.NewClientTLSFromFile(crtFile, "") >>> if err != nil { >>> log.Fatal("Error loading cert", err) >>> } >>> >>> conn, err := grpc.Dial("dev.mhub.my:16060", grpc. >>> WithTransportCredentials(creds)) >>> if err != nil { >>> log.Fatal("Error dialing", err) >>> } >>> >>> c := pbx.NewNodeClient(conn) >>> // we create a stream by invoking the client >>> stream, err := c.MessageLoop(context.Background()) >>> if err != nil { >>> log.Fatalf("Error while creating stream: %v", err) >>> return >>> } >>> >>> // get user's cognito user id >>> cuID, ok := req.URL.Query()["cid"] >>> >>> if !ok || len(cuID[0]) < 1 { >>> log.Println("Url Param 'cid' is missing") >>> return >>> } >>> tag := fmt.Sprintf(`"%v"`, cuID[0]) // mhub >>> resp := Response{"n/a"} >>> >>> requests := []*pbx.ClientMsg{ >>> &pbx.ClientMsg{ >>> Message: &pbx.ClientMsg_Hi{ >>> Hi: &pbx.ClientHi{ >>> Id: "1", >>> UserAgent: "Golang_Spider_Bot/3.0", >>> Ver: "0.15", >>> Lang: "EN", >>> }}, >>> }, >>> &pbx.ClientMsg{ >>> Message: &pbx.ClientMsg_Login{ >>> Login: &pbx.ClientLogin{ >>> Id: "2", >>> Scheme: "basic", >>> Secret: []byte("carol:carol123"), >>> }}, >>> }, >>> &pbx.ClientMsg{ >>> Message: &pbx.ClientMsg_Sub{ >>> Sub: &pbx.ClientSub{ >>> Id: "3", >>> Topic: "fnd", >>> GetQuery: &pbx.GetQuery{ >>> What: "sub", >>> }, >>> }, >>> }, >>> }, >>> &pbx.ClientMsg{ >>> Message: &pbx.ClientMsg_Set{ >>> Set: &pbx.ClientSet{ >>> Id: "4", >>> Topic: "fnd", >>> Query: &pbx.SetQuery{ >>> Desc: &pbx.SetDesc{ >>> Public: []byte(tag), >>> }, >>> }, >>> }, >>> }, >>> }, >>> &pbx.ClientMsg{ >>> Message: &pbx.ClientMsg_Get{ >>> Get: &pbx.ClientGet{ >>> Id: "5", >>> Topic: "fnd", >>> Query: &pbx.GetQuery{ >>> What: "sub", >>> }, >>> }}, >>> }, >>> } >>> >>> waitc := make(chan struct{}) >>> // we send a bunch of messages to the client (go routine) >>> go func() { >>> // function to send a bunch of messages >>> for _, req := range requests { >>> fmt.Printf("Sending message: %v\n", req) >>> stream.Send(req) >>> time.Sleep(500 * time.Millisecond) // doesn't run in >>> sequence if removed >>> } >>> stream.CloseSend() >>> }() >>> // we receive a bunch of messages from the client (go routine) >>> // count := 0 >>> go func() { >>> // function to receive a bunch of messages >>> uid := "n/a" >>> for { >>> res, err := stream.Recv() >>> if err == io.EOF { >>> resp.TinodeUserID = uid >>> js, err := json.Marshal(resp) >>> if err != nil { >>> http.Error(w, err.Error(), http. >>> StatusInternalServerError) >>> return >>> } >>> >>> w.Header().Set("Content-Type", "application/json") >>> w.Write(js) >>> break >>> } >>> if err != nil { >>> log.Fatalf("Error while receiving: %v", err) >>> break >>> } >>> if res.GetMeta() != nil { >>> if res.GetMeta().Id == "5" { >>> sub := res.GetMeta().GetSub() >>> for _, elem := range sub { >>> uid = elem.UserId >>> } >>> } >>> } >>> } >>> close(waitc) >>> }() >>> >>> // block until everything is done >>> <-waitc >>> } >>> >>> >>> >>> Response when operation is paused before sending the next message: >>> {"tinodeuserid":"usrNlWi3eOr74w"} // getting the expected result >>> >>> Response when operation is not paused before sending the next message: >>> {"tinodeuserid":"n/a"} >>> >>> >>> What is the right way to make sure the messages are sent in sequence? >>> My objective is to send message n+1 after message n completes (something >>> like using await in javascript). >>> >>> Thank you. >>> >>> >>> -- >> You received this message because you are subscribed to the Google Groups >> "grpc.io" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected]. >> To view this discussion on the web visit >> https://groups.google.com/d/msgid/grpc-io/d9d377dc-63aa-4b99-8d79-cf49c15cb75dn%40googlegroups.com >> >> <https://groups.google.com/d/msgid/grpc-io/d9d377dc-63aa-4b99-8d79-cf49c15cb75dn%40googlegroups.com?utm_medium=email&utm_source=footer> >> . >> > -- You received this message because you are subscribed to the Google Groups "grpc.io" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/6f52bfce-074a-4560-b7d1-0337bc8b07edn%40googlegroups.com.
