>> So you don't have to pause inbetween requests, just pause at the end of 
sending before calling CloseSend.

Thank you, this solved my issue. 

>> Also, messages in a single direction are received in order. This is 
guaranteed by TCP.

My grpc server has been defined as bi-directional. 
Does your statement mean I can still send messages in a single direction 
even if it's defined as bi-di; I assumed it has to be a stream. 
Appreciate some guidance. 


On Thursday, 17 September 2020 at 21:22:40 UTC+8 [email protected] wrote:

> I think the problem in this code is because of the sender ending too soon. 
> When the sender calls CloseSend, the server will end the other side of the 
> stream as well which causes the receiver to get an EOF and fails to 
> receive the desired response. So you don't have to pause inbetween 
> requests, just pause at the end of sending before calling CloseSend.
>
> Also, messages in a single direction are received in order. This is 
> guaranteed by TCP.
>
> On Thu, Sep 17, 2020 at 4:52 AM 'Easwar Swaminathan' via grpc.io <
> [email protected]> wrote:
>
>> A bidirectional streaming gRPC call is one where the two streams (sending 
>> and receiving) are totally independent of each other. Messages sent by one 
>> end will be received by the other end in the order in which it was sent. 
>> But there can be no guarantees on the order of messages across the streams. 
>> If you want to impose ordering between the sending and receiving, it has to 
>> be done at the application layer. For example, instead of sending all five 
>> messages from one goroutine and reading all responses in another, you could 
>> use a single goroutine to send and receive messages in order.
>>
>> On Thursday, August 27, 2020 at 10:58:05 AM UTC-7 [email protected] 
>> wrote:
>>
>>>
>>> I'm calling a bi-directional streaming service with five messages which 
>>> needs to be sent in sequence.  
>>> For example pbx.ClientMsg_Sub (Id=3) should be sent and *completed* 
>>> before pbx.ClientMsg_Set(Id=4).
>>>
>>> I notice that if I don't delay the sending of each message, I don't get 
>>> the expected response. 
>>>         for _, req := range requests {
>>>            fmt.Printf("Sending message: %v\n", req)
>>>            stream.Send(req)
>>>            time.Sleep(500 * time.Millisecond) // doesn't run in 
>>> sequence if removed
>>>        }
>>>
>>> My full code is shared below.
>>>
>>> func getUserByCUID(w http.ResponseWriter, req *http.Request) {
>>>    enableCors(&w)
>>>
>>>     crtFile := "/home/yogesnsamy/Coding/MHub/prod_cert/cert.crt"
>>>    creds, err := credentials.NewClientTLSFromFile(crtFile, "")
>>>    if err != nil {
>>>        log.Fatal("Error loading cert", err)
>>>    }
>>>
>>>     conn, err := grpc.Dial("dev.mhub.my:16060", grpc.
>>> WithTransportCredentials(creds))
>>>    if err != nil {
>>>        log.Fatal("Error dialing", err)
>>>    }
>>>
>>>     c := pbx.NewNodeClient(conn)
>>>    // we create a stream by invoking the client
>>>    stream, err := c.MessageLoop(context.Background())
>>>    if err != nil {
>>>        log.Fatalf("Error while creating stream: %v", err)
>>>        return
>>>    }
>>>
>>>     // get user's cognito user id
>>>    cuID, ok := req.URL.Query()["cid"]
>>>
>>>     if !ok || len(cuID[0]) < 1 {
>>>        log.Println("Url Param 'cid' is missing")
>>>        return
>>>    }
>>>    tag := fmt.Sprintf(`"%v"`, cuID[0]) // mhub
>>>    resp := Response{"n/a"}
>>>
>>>     requests := []*pbx.ClientMsg{
>>>        &pbx.ClientMsg{
>>>            Message: &pbx.ClientMsg_Hi{
>>>                Hi: &pbx.ClientHi{
>>>                    Id:        "1",
>>>                    UserAgent: "Golang_Spider_Bot/3.0",
>>>                    Ver:       "0.15",
>>>                    Lang:      "EN",
>>>                }},
>>>        },
>>>        &pbx.ClientMsg{
>>>            Message: &pbx.ClientMsg_Login{
>>>                Login: &pbx.ClientLogin{
>>>                    Id:     "2",
>>>                    Scheme: "basic",
>>>                    Secret: []byte("carol:carol123"),
>>>                }},
>>>        },
>>>        &pbx.ClientMsg{
>>>            Message: &pbx.ClientMsg_Sub{
>>>                Sub: &pbx.ClientSub{
>>>                    Id:    "3",
>>>                    Topic: "fnd",
>>>                    GetQuery: &pbx.GetQuery{
>>>                        What: "sub",
>>>                    },
>>>                },
>>>            },
>>>        },
>>>        &pbx.ClientMsg{
>>>            Message: &pbx.ClientMsg_Set{
>>>                Set: &pbx.ClientSet{
>>>                    Id:    "4",
>>>                    Topic: "fnd",
>>>                    Query: &pbx.SetQuery{
>>>                        Desc: &pbx.SetDesc{
>>>                            Public: []byte(tag),
>>>                        },
>>>                    },
>>>                },
>>>            },
>>>        },
>>>        &pbx.ClientMsg{
>>>            Message: &pbx.ClientMsg_Get{
>>>                Get: &pbx.ClientGet{
>>>                    Id:    "5",
>>>                    Topic: "fnd",
>>>                    Query: &pbx.GetQuery{
>>>                        What: "sub",
>>>                    },
>>>                }},
>>>        },
>>>    }
>>>
>>>     waitc := make(chan struct{})
>>>    // we send a bunch of messages to the client (go routine)
>>>    go func() {
>>>        // function to send a bunch of messages
>>>        for _, req := range requests {
>>>            fmt.Printf("Sending message: %v\n", req)
>>>            stream.Send(req)
>>>            time.Sleep(500 * time.Millisecond) // doesn't run in 
>>> sequence if removed
>>>        }
>>>        stream.CloseSend()
>>>    }()
>>>    // we receive a bunch of messages from the client (go routine)
>>>    // count := 0
>>>    go func() {
>>>        // function to receive a bunch of messages
>>>        uid := "n/a"
>>>        for {
>>>            res, err := stream.Recv()
>>>            if err == io.EOF {
>>>                resp.TinodeUserID = uid
>>>                js, err := json.Marshal(resp)
>>>                if err != nil {
>>>                    http.Error(w, err.Error(), http.
>>> StatusInternalServerError)
>>>                    return
>>>                }
>>>
>>>                 w.Header().Set("Content-Type", "application/json")
>>>                w.Write(js)
>>>                break
>>>            }
>>>            if err != nil {
>>>                log.Fatalf("Error while receiving: %v", err)
>>>                break
>>>            }
>>>            if res.GetMeta() != nil {
>>>                if res.GetMeta().Id == "5" {
>>>                    sub := res.GetMeta().GetSub()
>>>                    for _, elem := range sub {
>>>                        uid = elem.UserId
>>>                    }
>>>                }
>>>            }
>>>        }
>>>        close(waitc)
>>>    }()
>>>
>>>     // block until everything is done
>>>    <-waitc
>>> }
>>>
>>>
>>>
>>> Response when operation is paused before sending the next message:
>>> {"tinodeuserid":"usrNlWi3eOr74w"} // getting the expected result
>>>
>>> Response when operation is not paused before sending the next message:
>>> {"tinodeuserid":"n/a"}
>>>
>>>
>>> What is the right way to make sure the messages are sent in sequence?
>>> My objective is to send message n+1 after message n completes (something 
>>> like using await in javascript).
>>>
>>> Thank you. 
>>>
>>>
>>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "grpc.io" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected].
>> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/grpc-io/d9d377dc-63aa-4b99-8d79-cf49c15cb75dn%40googlegroups.com
>>  
>> <https://groups.google.com/d/msgid/grpc-io/d9d377dc-63aa-4b99-8d79-cf49c15cb75dn%40googlegroups.com?utm_medium=email&utm_source=footer>
>> .
>>
>

-- 
You received this message because you are subscribed to the Google Groups 
"grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/grpc-io/6f52bfce-074a-4560-b7d1-0337bc8b07edn%40googlegroups.com.

Reply via email to