In your send function you should 
request.send().detach([&](kj::Exception&& error) { /*handle returned 
error*/; });
If you don't the promise is destructed and the call gets cancelled.

On Friday, May 6, 2022 at 1:22:50 PM UTC+2 Gyorgy Miru wrote:

> Hi all,
>
> I am trying to implement a simple architecture where various backend 
> servers gather data (on Android devices) and push them to the subscribers. 
> In the backend the data gatherer runs is a separate thread from the server. 
> My current issue is when messages are sent to the subscriber, they are not 
> received on the client side. I am not sure if I need to wait on the promise 
> that is returned by the send() call in order to actually place the message 
> on the wire or maybe the problem is completely unrelated.
>
> First, I've tried using executor->executeSync(), from the gatherer 
> thread, to the send the messages, but then I can't wait on the promise 
> because the code runs in an event callback.
> I've also tried to start a kj event loop in the gatherer thread and 
> publish the messages from there, but then when  I try to wait I get the 
> following exception " Nothing to wait for; this thread would hang forever."
> Please find the relevant code samples below:
>
> *test.capnp*
>
> @0xf84f5237f35aec13;
>
> interface Proxy {
>    getBackend @0 () -> (backend: Backend);
> }
>
> interface Backend {
>    consume @0 (msg :Message);
>    subscribe @1 (receiver :Receiver);
> }
>
> interface Receiver {
>    receive @0 (msg: Message);
> }
>
> struct Message {
>    body @0 :Data;
> }
>
> *backend.cpp*
>
> class BackendImpl final: public Backend::Server {
>     private:
>         bool first_received;
>         bool receiver_ready;
>         unsigned recv_count;
>         unsigned count;
>         unsigned size;
>         chrono::time_point<chrono::high_resolution_clock> begin;
>         Receiver::Client receiver;
>     
>     public:
>         BackendImpl(unsigned cnt): first_received(false), 
> receiver_ready(false), count(cnt), receiver(nullptr) {};
>         kj::Promise<void> consume(ConsumeContext context) override {
>             //... Benchmark tests are omitted
>             return kj::READY_NOW;
>         }
>
>         kj::Promise<void> subscribe(SubscribeContext context) override {
>             this->receiver = context.getParams().getReceiver();
>             cout << "Receiver received" << endl;
>             this->receiver_ready = true;
>             return kj::READY_NOW;
>         }
>
>         //bool send(int cnt, int size, kj::WaitScope& ws) { // I have 
> tried passing the wait scope
>         bool send(int cnt, int size) {
>             if (!this->receiver_ready) {
>                 return false;
>             }
>             this->receiver_ready = false;
>             cout << "Sending data" << endl;
>             unsigned char* msg = new unsigned char[size];
>             memset(msg, 'a', size);
>
>
>             for (int i = 0; i < this->count; i++) {
>                 
>                 auto request = this->receiver.receiveRequest();
>                 request.getMsg().setBody(capnp::Data::Reader(msg, size));
>                 auto promise = request.send();
>                 //auto promise = request.send().wait(ws);
>
>                 auto res = 
> promise.then([](capnp::Response<Receiver::ReceiveResults>&& result){
>                     cout << result.totalSize().wordCount << endl << flush; 
> // This never gets called
>                     return result.totalSize().wordCount;
>                     });
>
>             }
>             delete[] msg;
>             cout << "Sending done" << endl << flush;
>             return true;
>         }
> };
>
> BackendImpl* backend;
> void data_pusher(const kj::Executor* executor, int size, int cnt) {
>
>     //kj::EventLoop loop;
>     //loop.run();
>     //kj::WaitScope waitScope(loop);
>     while (executor->isLive()) {
>         this_thread::sleep_for(1s);
>
>         //backend->send(cnt, size, waitScope);
>
>         executor->executeSync( [backend=backend, cnt=cnt, size=size] {
>                 backend->send(cnt, size);
>             });
>     }    
> }
>
> int main(int argc, const char* argv[]) {
>
>   auto be = kj::heap<BackendImpl>(10000);
>   backend = be.get(); // There must be a sexier way to do this
>   
>   // Set up a server.
>   capnp::EzRpcServer server(kj::mv(be), "127.0.0.1:11223");
>
>   waitScope = &server.getWaitScope();
>
>   const kj::Executor& executor = kj::getCurrentThreadExecutor();
>   std::thread thr(data_pusher, &executor, 4096, 10000);
>   std::cout << "Backend Ready" << std::endl;
>
>   // Run forever, accepting connections and handling requests.
>   kj::NEVER_DONE.wait(*waitScope);
>   thr.join();
> }
>
> *client.py*
>
> capnp.remove_import_hook()
> a = capnp.load('./test.capnp')
>
> class ReceiverImpl(a.Receiver.Server):
>
>     def __init__(self, count):
>         self.count = count
>         self.has_received = False
>         self.recv_cnt = 0
>
>     def receive(self, msg, _context, **kwargs):
>         print("We get called, yay!")
>         #... benchmarking code omitted
>
> def direct_subscribe():
>
>     client = capnp.TwoPartyClient('127.0.0.1:11223')
>     backend = client.bootstrap().cast_as(a.Backend)
>     #backend = proxy.getBackend().backend
>
>     receiver = ReceiverImpl(10000)
>
>     backend.subscribe(receiver).wait()
>
>     while receiver.recv_cnt < 10000:
>         time.sleep(1)
>
> direct_subscribe()
>
> When I execute this code, without any wait() calls in the publisher, the 
> sending immediately completes async, but the receive() function of the 
> receiver never gets called.
>
> Thank you for your time and assistance.
> -Gym
>

-- 
You received this message because you are subscribed to the Google Groups 
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/capnproto/7e9ba9d1-6804-4cd5-bf4c-857dafd122c8n%40googlegroups.com.

Reply via email to