I'm new to Cap'n Proto and was experimenting with bidirectional streams. At 
a high level, what I'd like to do is, in sequence:

1. Client streams some chunks to the server.
2. Client tells the server it is done with its stream.
3. Server streams back some chunks to the client.
4. Server tells the client it is done with its stream.

Based on a post from a few days ago, I have (1) and (2) working, but was 
trying to figure out how to do (3) and (4). (3) happens in response to (2).

I'm using this IDL for illustration:

interface StreamService {
  reqReply @0 (replySvc :ReplyCallback) -> (reqSvc :RequestCallback);
  
  interface RequestCallback {
    sendChunk @0 (chunk :Text) -> stream;
    done @1 ();
  } 
  
  interface ReplyCallback {
    sendChunk @0 (chunk :Text) -> stream;
    done @1 ();
  }
}

And the server is implemented as:

struct ReqSvcImpl final: public StreamService::RequestCallback::Server {

    explicit ReqSvcImpl(StreamService::ReplyCallback::Client replySvc) :
        _replySvc(replySvc) {}

    kj::Promise<void>
    sendChunk(SendChunkContext context) override
    {
        auto chunk = context.getParams().getChunk();
        std::cout << "Received \"" << chunk.cStr() << "\"" << std::endl;
        _vec.push_back(std::string(chunk.cStr()));
        return kj::READY_NOW;
    }

    kj::Promise<void>
    done(DoneContext context) override
    {
        std::cout << "done called:" << std::endl;
        for (const auto str : _vec) {
            std::cout << "  " << str << std::endl;
        }

        return kj::READY_NOW;
    }

private:

    StreamService::ReplyCallback::Client _replySvc;
    std::vector<std::string> _vec;
};

struct StreamServiceImpl final: public StreamService::Server {
    kj::Promise<void>
    reqReply(ReqReplyContext context) override
    {
        auto replySvc = context.getParams().getReplySvc();
        context.getResults().setReqSvc(kj::heap<ReqSvcImpl>(replySvc));
        return kj::READY_NOW;
    }
};

On the client, I followed the pattern mentioned on an earlier post using 
recursive promises to send the stream to the server:

struct ReqSvcImpl final: public StreamService::RequestCallback::Server {
    // Similar to ReqSvcImpl on the server, with a sendChunk/done function 
that prints the chunks
};

static kj::Promise<void>
sendStrings(StreamService::RequestCallback::Client stream, uint32_t strNum,
            uint32_t stopNum)
{
    if (strNum == stopNum) {
        return stream.doneRequest().send().ignoreResult();
    }

    std::string str("Test " + std::to_string(strNum));
    auto chunkReq = stream.sendChunkRequest();
    chunkReq.setChunk(str.c_str());
    return chunkReq.send().then([stream=kj::mv(stream),
                                strNum, stopNum]() mutable {
            return sendStrings(kj::mv(stream), strNum + 1, stopNum);
        });
}


main(int argc, const char* argv[])
...
    auto req = svc.reqReplyRequest();
    auto reqSvc = req.send().getReqSvc();
    auto promise = sendStrings(reqSvc, 1, 11);
    promise.wait(waitScope);

I'm trying to figure out what I should do/return in the server's 
ReqSvcImpl::done() 
function and what, if anything, I need to do different on the client to 
wait on the server's stream to be received and finish.

I'm assuming ReqSvcImpl::done() needs to return a different promise object, but 
not sure exactly what that promise object should look like or how it 
incorporates 
the sendChunkRequest/doneRequest calls back to the client.

Thanks,

Matt

-- 
You received this message because you are subscribed to the Google Groups 
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to capnproto+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/capnproto/ecceef56-1e16-4e41-b0e3-be1a882398a7%40googlegroups.com.

Reply via email to