I'm calling a bi-directional streaming service with five messages which
needs to be sent in sequence.
For example pbx.ClientMsg_Sub (Id=3) should be sent and *completed* before
pbx.ClientMsg_Set(Id=4).
I notice that if I don't delay the sending of each message, I don't get the
expected response.
for _, req := range requests {
fmt.Printf("Sending message: %v\n", req)
stream.Send(req)
time.Sleep(500 * time.Millisecond) // doesn't run in sequence if
removed
}
My full code is shared below.
func getUserByCUID(w http.ResponseWriter, req *http.Request) {
enableCors(&w)
crtFile := "/home/yogesnsamy/Coding/MHub/prod_cert/cert.crt"
creds, err := credentials.NewClientTLSFromFile(crtFile, "")
if err != nil {
log.Fatal("Error loading cert", err)
}
conn, err := grpc.Dial("dev.mhub.my:16060", grpc.
WithTransportCredentials(creds))
if err != nil {
log.Fatal("Error dialing", err)
}
c := pbx.NewNodeClient(conn)
// we create a stream by invoking the client
stream, err := c.MessageLoop(context.Background())
if err != nil {
log.Fatalf("Error while creating stream: %v", err)
return
}
// get user's cognito user id
cuID, ok := req.URL.Query()["cid"]
if !ok || len(cuID[0]) < 1 {
log.Println("Url Param 'cid' is missing")
return
}
tag := fmt.Sprintf(`"%v"`, cuID[0]) // mhub
resp := Response{"n/a"}
requests := []*pbx.ClientMsg{
&pbx.ClientMsg{
Message: &pbx.ClientMsg_Hi{
Hi: &pbx.ClientHi{
Id: "1",
UserAgent: "Golang_Spider_Bot/3.0",
Ver: "0.15",
Lang: "EN",
}},
},
&pbx.ClientMsg{
Message: &pbx.ClientMsg_Login{
Login: &pbx.ClientLogin{
Id: "2",
Scheme: "basic",
Secret: []byte("carol:carol123"),
}},
},
&pbx.ClientMsg{
Message: &pbx.ClientMsg_Sub{
Sub: &pbx.ClientSub{
Id: "3",
Topic: "fnd",
GetQuery: &pbx.GetQuery{
What: "sub",
},
},
},
},
&pbx.ClientMsg{
Message: &pbx.ClientMsg_Set{
Set: &pbx.ClientSet{
Id: "4",
Topic: "fnd",
Query: &pbx.SetQuery{
Desc: &pbx.SetDesc{
Public: []byte(tag),
},
},
},
},
},
&pbx.ClientMsg{
Message: &pbx.ClientMsg_Get{
Get: &pbx.ClientGet{
Id: "5",
Topic: "fnd",
Query: &pbx.GetQuery{
What: "sub",
},
}},
},
}
waitc := make(chan struct{})
// we send a bunch of messages to the client (go routine)
go func() {
// function to send a bunch of messages
for _, req := range requests {
fmt.Printf("Sending message: %v\n", req)
stream.Send(req)
time.Sleep(500 * time.Millisecond) // doesn't run in sequence if
removed
}
stream.CloseSend()
}()
// we receive a bunch of messages from the client (go routine)
// count := 0
go func() {
// function to receive a bunch of messages
uid := "n/a"
for {
res, err := stream.Recv()
if err == io.EOF {
resp.TinodeUserID = uid
js, err := json.Marshal(resp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError
)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(js)
break
}
if err != nil {
log.Fatalf("Error while receiving: %v", err)
break
}
if res.GetMeta() != nil {
if res.GetMeta().Id == "5" {
sub := res.GetMeta().GetSub()
for _, elem := range sub {
uid = elem.UserId
}
}
}
}
close(waitc)
}()
// block until everything is done
<-waitc
}
Response when operation is paused before sending the next message:
{"tinodeuserid":"usrNlWi3eOr74w"} // getting the expected result
Response when operation is not paused before sending the next message:
{"tinodeuserid":"n/a"}
What is the right way to make sure the messages are sent in sequence?
My objective is to send message n+1 after message n completes (something
like using await in javascript).
Thank you.
--
You received this message because you are subscribed to the Google Groups
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/grpc-io/dabd0959-93a4-4e15-ab22-37c8d35bb48bo%40googlegroups.com.