I'm calling a bi-directional streaming service with five messages which 
needs to be sent in sequence.  
For example pbx.ClientMsg_Sub (Id=3) should be sent and *completed* before 
pbx.ClientMsg_Set(Id=4).

I notice that if I don't delay the sending of each message, I don't get the 
expected response. 
        for _, req := range requests {
           fmt.Printf("Sending message: %v\n", req)
           stream.Send(req)
           time.Sleep(500 * time.Millisecond) // doesn't run in sequence if 
removed
       }

My full code is shared below.

func getUserByCUID(w http.ResponseWriter, req *http.Request) {
   enableCors(&w)

    crtFile := "/home/yogesnsamy/Coding/MHub/prod_cert/cert.crt"
   creds, err := credentials.NewClientTLSFromFile(crtFile, "")
   if err != nil {
       log.Fatal("Error loading cert", err)
   }

    conn, err := grpc.Dial("dev.mhub.my:16060", grpc.
WithTransportCredentials(creds))
   if err != nil {
       log.Fatal("Error dialing", err)
   }

    c := pbx.NewNodeClient(conn)
   // we create a stream by invoking the client
   stream, err := c.MessageLoop(context.Background())
   if err != nil {
       log.Fatalf("Error while creating stream: %v", err)
       return
   }

    // get user's cognito user id
   cuID, ok := req.URL.Query()["cid"]

    if !ok || len(cuID[0]) < 1 {
       log.Println("Url Param 'cid' is missing")
       return
   }
   tag := fmt.Sprintf(`"%v"`, cuID[0]) // mhub
   resp := Response{"n/a"}

    requests := []*pbx.ClientMsg{
       &pbx.ClientMsg{
           Message: &pbx.ClientMsg_Hi{
               Hi: &pbx.ClientHi{
                   Id:        "1",
                   UserAgent: "Golang_Spider_Bot/3.0",
                   Ver:       "0.15",
                   Lang:      "EN",
               }},
       },
       &pbx.ClientMsg{
           Message: &pbx.ClientMsg_Login{
               Login: &pbx.ClientLogin{
                   Id:     "2",
                   Scheme: "basic",
                   Secret: []byte("carol:carol123"),
               }},
       },
       &pbx.ClientMsg{
           Message: &pbx.ClientMsg_Sub{
               Sub: &pbx.ClientSub{
                   Id:    "3",
                   Topic: "fnd",
                   GetQuery: &pbx.GetQuery{
                       What: "sub",
                   },
               },
           },
       },
       &pbx.ClientMsg{
           Message: &pbx.ClientMsg_Set{
               Set: &pbx.ClientSet{
                   Id:    "4",
                   Topic: "fnd",
                   Query: &pbx.SetQuery{
                       Desc: &pbx.SetDesc{
                           Public: []byte(tag),
                       },
                   },
               },
           },
       },
       &pbx.ClientMsg{
           Message: &pbx.ClientMsg_Get{
               Get: &pbx.ClientGet{
                   Id:    "5",
                   Topic: "fnd",
                   Query: &pbx.GetQuery{
                       What: "sub",
                   },
               }},
       },
   }

    waitc := make(chan struct{})
   // we send a bunch of messages to the client (go routine)
   go func() {
       // function to send a bunch of messages
       for _, req := range requests {
           fmt.Printf("Sending message: %v\n", req)
           stream.Send(req)
           time.Sleep(500 * time.Millisecond) // doesn't run in sequence if 
removed
       }
       stream.CloseSend()
   }()
   // we receive a bunch of messages from the client (go routine)
   // count := 0
   go func() {
       // function to receive a bunch of messages
       uid := "n/a"
       for {
           res, err := stream.Recv()
           if err == io.EOF {
               resp.TinodeUserID = uid
               js, err := json.Marshal(resp)
               if err != nil {
                   http.Error(w, err.Error(), http.StatusInternalServerError
)
                   return
               }

                w.Header().Set("Content-Type", "application/json")
               w.Write(js)
               break
           }
           if err != nil {
               log.Fatalf("Error while receiving: %v", err)
               break
           }
           if res.GetMeta() != nil {
               if res.GetMeta().Id == "5" {
                   sub := res.GetMeta().GetSub()
                   for _, elem := range sub {
                       uid = elem.UserId
                   }
               }
           }
       }
       close(waitc)
   }()

    // block until everything is done
   <-waitc
}



Response when operation is paused before sending the next message:
{"tinodeuserid":"usrNlWi3eOr74w"} // getting the expected result

Response when operation is not paused before sending the next message:
{"tinodeuserid":"n/a"}


What is the right way to make sure the messages are sent in sequence?
My objective is to send message n+1 after message n completes (something 
like using await in javascript).

Thank you. 


-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/dabd0959-93a4-4e15-ab22-37c8d35bb48bo%40googlegroups.com.

Reply via email to