In your send function you should
request.send().detach([&](kj::Exception&& error) { /*handle returned
error*/; });
If you don't the promise is destructed and the call gets cancelled.
On Friday, May 6, 2022 at 1:22:50 PM UTC+2 Gyorgy Miru wrote:
> Hi all,
>
> I am trying to implement a simple architecture where various backend
> servers gather data (on Android devices) and push them to the subscribers.
> In the backend the data gatherer runs is a separate thread from the server.
> My current issue is when messages are sent to the subscriber, they are not
> received on the client side. I am not sure if I need to wait on the promise
> that is returned by the send() call in order to actually place the message
> on the wire or maybe the problem is completely unrelated.
>
> First, I've tried using executor->executeSync(), from the gatherer
> thread, to the send the messages, but then I can't wait on the promise
> because the code runs in an event callback.
> I've also tried to start a kj event loop in the gatherer thread and
> publish the messages from there, but then when I try to wait I get the
> following exception " Nothing to wait for; this thread would hang forever."
> Please find the relevant code samples below:
>
> *test.capnp*
>
> @0xf84f5237f35aec13;
>
> interface Proxy {
> getBackend @0 () -> (backend: Backend);
> }
>
> interface Backend {
> consume @0 (msg :Message);
> subscribe @1 (receiver :Receiver);
> }
>
> interface Receiver {
> receive @0 (msg: Message);
> }
>
> struct Message {
> body @0 :Data;
> }
>
> *backend.cpp*
>
> class BackendImpl final: public Backend::Server {
> private:
> bool first_received;
> bool receiver_ready;
> unsigned recv_count;
> unsigned count;
> unsigned size;
> chrono::time_point<chrono::high_resolution_clock> begin;
> Receiver::Client receiver;
>
> public:
> BackendImpl(unsigned cnt): first_received(false),
> receiver_ready(false), count(cnt), receiver(nullptr) {};
> kj::Promise<void> consume(ConsumeContext context) override {
> //... Benchmark tests are omitted
> return kj::READY_NOW;
> }
>
> kj::Promise<void> subscribe(SubscribeContext context) override {
> this->receiver = context.getParams().getReceiver();
> cout << "Receiver received" << endl;
> this->receiver_ready = true;
> return kj::READY_NOW;
> }
>
> //bool send(int cnt, int size, kj::WaitScope& ws) { // I have
> tried passing the wait scope
> bool send(int cnt, int size) {
> if (!this->receiver_ready) {
> return false;
> }
> this->receiver_ready = false;
> cout << "Sending data" << endl;
> unsigned char* msg = new unsigned char[size];
> memset(msg, 'a', size);
>
>
> for (int i = 0; i < this->count; i++) {
>
> auto request = this->receiver.receiveRequest();
> request.getMsg().setBody(capnp::Data::Reader(msg, size));
> auto promise = request.send();
> //auto promise = request.send().wait(ws);
>
> auto res =
> promise.then([](capnp::Response<Receiver::ReceiveResults>&& result){
> cout << result.totalSize().wordCount << endl << flush;
> // This never gets called
> return result.totalSize().wordCount;
> });
>
> }
> delete[] msg;
> cout << "Sending done" << endl << flush;
> return true;
> }
> };
>
> BackendImpl* backend;
> void data_pusher(const kj::Executor* executor, int size, int cnt) {
>
> //kj::EventLoop loop;
> //loop.run();
> //kj::WaitScope waitScope(loop);
> while (executor->isLive()) {
> this_thread::sleep_for(1s);
>
> //backend->send(cnt, size, waitScope);
>
> executor->executeSync( [backend=backend, cnt=cnt, size=size] {
> backend->send(cnt, size);
> });
> }
> }
>
> int main(int argc, const char* argv[]) {
>
> auto be = kj::heap<BackendImpl>(10000);
> backend = be.get(); // There must be a sexier way to do this
>
> // Set up a server.
> capnp::EzRpcServer server(kj::mv(be), "127.0.0.1:11223");
>
> waitScope = &server.getWaitScope();
>
> const kj::Executor& executor = kj::getCurrentThreadExecutor();
> std::thread thr(data_pusher, &executor, 4096, 10000);
> std::cout << "Backend Ready" << std::endl;
>
> // Run forever, accepting connections and handling requests.
> kj::NEVER_DONE.wait(*waitScope);
> thr.join();
> }
>
> *client.py*
>
> capnp.remove_import_hook()
> a = capnp.load('./test.capnp')
>
> class ReceiverImpl(a.Receiver.Server):
>
> def __init__(self, count):
> self.count = count
> self.has_received = False
> self.recv_cnt = 0
>
> def receive(self, msg, _context, **kwargs):
> print("We get called, yay!")
> #... benchmarking code omitted
>
> def direct_subscribe():
>
> client = capnp.TwoPartyClient('127.0.0.1:11223')
> backend = client.bootstrap().cast_as(a.Backend)
> #backend = proxy.getBackend().backend
>
> receiver = ReceiverImpl(10000)
>
> backend.subscribe(receiver).wait()
>
> while receiver.recv_cnt < 10000:
> time.sleep(1)
>
> direct_subscribe()
>
> When I execute this code, without any wait() calls in the publisher, the
> sending immediately completes async, but the receive() function of the
> receiver never gets called.
>
> Thank you for your time and assistance.
> -Gym
>
--
You received this message because you are subscribed to the Google Groups
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/capnproto/7e9ba9d1-6804-4cd5-bf4c-857dafd122c8n%40googlegroups.com.