A bidirectional streaming gRPC call is one where the two streams (sending 
and receiving) are totally independent of each other. Messages sent by one 
end will be received by the other end in the order in which it was sent. 
But there can be no guarantees on the order of messages across the streams. 
If you want to impose ordering between the sending and receiving, it has to 
be done at the application layer. For example, instead of sending all five 
messages from one goroutine and reading all responses in another, you could 
use a single goroutine to send and receive messages in order.

On Thursday, August 27, 2020 at 10:58:05 AM UTC-7 [email protected] 
wrote:

>
> I'm calling a bi-directional streaming service with five messages which 
> needs to be sent in sequence.  
> For example pbx.ClientMsg_Sub (Id=3) should be sent and *completed* 
> before pbx.ClientMsg_Set(Id=4).
>
> I notice that if I don't delay the sending of each message, I don't get 
> the expected response. 
>         for _, req := range requests {
>            fmt.Printf("Sending message: %v\n", req)
>            stream.Send(req)
>            time.Sleep(500 * time.Millisecond) // doesn't run in sequence 
> if removed
>        }
>
> My full code is shared below.
>
> func getUserByCUID(w http.ResponseWriter, req *http.Request) {
>    enableCors(&w)
>
>     crtFile := "/home/yogesnsamy/Coding/MHub/prod_cert/cert.crt"
>    creds, err := credentials.NewClientTLSFromFile(crtFile, "")
>    if err != nil {
>        log.Fatal("Error loading cert", err)
>    }
>
>     conn, err := grpc.Dial("dev.mhub.my:16060", grpc.
> WithTransportCredentials(creds))
>    if err != nil {
>        log.Fatal("Error dialing", err)
>    }
>
>     c := pbx.NewNodeClient(conn)
>    // we create a stream by invoking the client
>    stream, err := c.MessageLoop(context.Background())
>    if err != nil {
>        log.Fatalf("Error while creating stream: %v", err)
>        return
>    }
>
>     // get user's cognito user id
>    cuID, ok := req.URL.Query()["cid"]
>
>     if !ok || len(cuID[0]) < 1 {
>        log.Println("Url Param 'cid' is missing")
>        return
>    }
>    tag := fmt.Sprintf(`"%v"`, cuID[0]) // mhub
>    resp := Response{"n/a"}
>
>     requests := []*pbx.ClientMsg{
>        &pbx.ClientMsg{
>            Message: &pbx.ClientMsg_Hi{
>                Hi: &pbx.ClientHi{
>                    Id:        "1",
>                    UserAgent: "Golang_Spider_Bot/3.0",
>                    Ver:       "0.15",
>                    Lang:      "EN",
>                }},
>        },
>        &pbx.ClientMsg{
>            Message: &pbx.ClientMsg_Login{
>                Login: &pbx.ClientLogin{
>                    Id:     "2",
>                    Scheme: "basic",
>                    Secret: []byte("carol:carol123"),
>                }},
>        },
>        &pbx.ClientMsg{
>            Message: &pbx.ClientMsg_Sub{
>                Sub: &pbx.ClientSub{
>                    Id:    "3",
>                    Topic: "fnd",
>                    GetQuery: &pbx.GetQuery{
>                        What: "sub",
>                    },
>                },
>            },
>        },
>        &pbx.ClientMsg{
>            Message: &pbx.ClientMsg_Set{
>                Set: &pbx.ClientSet{
>                    Id:    "4",
>                    Topic: "fnd",
>                    Query: &pbx.SetQuery{
>                        Desc: &pbx.SetDesc{
>                            Public: []byte(tag),
>                        },
>                    },
>                },
>            },
>        },
>        &pbx.ClientMsg{
>            Message: &pbx.ClientMsg_Get{
>                Get: &pbx.ClientGet{
>                    Id:    "5",
>                    Topic: "fnd",
>                    Query: &pbx.GetQuery{
>                        What: "sub",
>                    },
>                }},
>        },
>    }
>
>     waitc := make(chan struct{})
>    // we send a bunch of messages to the client (go routine)
>    go func() {
>        // function to send a bunch of messages
>        for _, req := range requests {
>            fmt.Printf("Sending message: %v\n", req)
>            stream.Send(req)
>            time.Sleep(500 * time.Millisecond) // doesn't run in sequence 
> if removed
>        }
>        stream.CloseSend()
>    }()
>    // we receive a bunch of messages from the client (go routine)
>    // count := 0
>    go func() {
>        // function to receive a bunch of messages
>        uid := "n/a"
>        for {
>            res, err := stream.Recv()
>            if err == io.EOF {
>                resp.TinodeUserID = uid
>                js, err := json.Marshal(resp)
>                if err != nil {
>                    http.Error(w, err.Error(), http.
> StatusInternalServerError)
>                    return
>                }
>
>                 w.Header().Set("Content-Type", "application/json")
>                w.Write(js)
>                break
>            }
>            if err != nil {
>                log.Fatalf("Error while receiving: %v", err)
>                break
>            }
>            if res.GetMeta() != nil {
>                if res.GetMeta().Id == "5" {
>                    sub := res.GetMeta().GetSub()
>                    for _, elem := range sub {
>                        uid = elem.UserId
>                    }
>                }
>            }
>        }
>        close(waitc)
>    }()
>
>     // block until everything is done
>    <-waitc
> }
>
>
>
> Response when operation is paused before sending the next message:
> {"tinodeuserid":"usrNlWi3eOr74w"} // getting the expected result
>
> Response when operation is not paused before sending the next message:
> {"tinodeuserid":"n/a"}
>
>
> What is the right way to make sure the messages are sent in sequence?
> My objective is to send message n+1 after message n completes (something 
> like using await in javascript).
>
> Thank you. 
>
>
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/d9d377dc-63aa-4b99-8d79-cf49c15cb75dn%40googlegroups.com.

Reply via email to