Hi All,

Implementing a similar thing, but with an asynchronous client in python. I 
can't for the life of me get the client object to become derefenced. 
The asyncio tx/rx seems to get stuck trying to read/write to the 
now-missing publisher.

I think the async loop owns a reference to the subscriber object (as well 
as the publisher), but I need to know when the publisher releases the 
reference in order to stop the async loop. 

Any ideas? 

On Friday, 3 December 2021 at 20:02:18 UTC+2 ken...@cloudflare.com wrote:

> What I usually do is something like:
>
> auto impl = kj::heap<SubscriberImpl>();
> auto& ref = *impl;
> Subscriber::Client client = kj::mv(impl);
> rq.setSubscriber(client);
>
> Now you can keep a copy of `client` locally, and as long as it still 
> exists, then `ref` remains valid -- because `client` is itself a strong 
> reference to the object.
>
> Note this doesn't work if you need to get notification of when the 
> `SubscriberImpl` is dropped from the remote side, since holding a strong 
> ref locally prevents the destructor from being called. If this is an issue, 
> then you need to do something different. What I usually do here is store a 
> pointer somewhere, and then have the destructor null out that pointer. This 
> effectively implements a weak reference.
>
> -Kenton
>
> On Thu, Dec 2, 2021 at 2:22 PM Jens Alfke <jens....@gmail.com> wrote:
>
>> I'm also implementing pub-sub, so I was glad to see this thread before I 
>> wasted too much time. I'm implementing this pattern, but having trouble on 
>> the client side.
>>
>> In terms of the example interface, I've created my SubscriberImpl class, 
>> and written the code to send the "subscribe" message. Now I want to store 
>> the returned Subscription capability reference in my SubscriberImpl, so it 
>> can own it and drop it when it's done.
>>
>> However, I can't figure out how to keep a reference to the 
>> SubscriberImpl, since I have to move it to the Request object (calling 
>> setSubscriber) and afterwards it's gone, so I can't call it again.
>>
>>             auto rq = remotePublisher.subscribeRequest();
>>             auto impl = kj::heap<SubscriberImpl>();
>>             rq.setSubscriber(std::move(impl));
>>             auto promise = rq.send().then([&](auto response) {return 
>> response.getSubscription();});
>>             // *somehow convey the promise to the SubscriberImpl...?*
>>
>> I'm sure this is just due to my incomplete understanding of how 
>> Promise/Client/Server objects work...
>> Thanks,
>>
>> --Jens
>>
>> On Monday, November 22, 2021 at 8:53:42 AM UTC-8 ken...@cloudflare.com 
>> wrote:
>>
>>> Hi Mitsuo,
>>>
>>> I recommend designing the interface like this:
>>>
>>> interface EventPublisher{
>>>     interface Subscriber {
>>>         updateEvent @0 (event: Int32) -> ();
>>>     }
>>>
>>>     interface Subscription {}
>>>     subscribe @0 (subscriber: Subscriber) -> (result: Int32, 
>>> subscription: Subscription);
>>>     # To unsubscribe, drop the returned `subscription`.
>>> }
>>>
>>>
>>> Here, subscribe() returns a `subscription` object. This object has no 
>>> methods. But, when the capability is dropped, then the destructor will run 
>>> on the server side. Atn that point, you can remove the subscription.
>>>
>>> A big advantage of this approach is that it handles connection failures 
>>> gracefully. If the client randomly disconnects, the `subscription` 
>>> capability is automatically dropped, thus unsubscribing the client. This 
>>> way you don't end up stuck sending messages to a disconnected subscriber.
>>>
>>> It also solves your problem because you can remember arbitrary metadata 
>>> about the subscriber within the `Subscription` object, so you know what to 
>>> remove when it's destroyed.
>>>
>>> -Kenton
>>>
>>> On Mon, Nov 22, 2021 at 12:40 AM mitsuo <mitsu...@woven-planet.global> 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm trying to implement pub/sub like communication model with the 
>>>> following scheme.
>>>> *pubsub.capnp*
>>>> *interface EventPublisher{*
>>>> *    interface Subscriber {*
>>>> *        updateEvent @0 (event: Int32) -> ();*
>>>> *    }*
>>>> *    subscribe @0 (subscriber: Subscriber) -> (result: Int32);*
>>>> *    unsubscribe @1 (subscriber: Subscriber) -> (result: Int32);*
>>>> *}*
>>>>
>>>> I'm using *kj::Vector<EventPublisher::Subscriber::Client> 
>>>> m_subscribers *to store the current subscribers.
>>>> When I try to implement unsubscribe and remove the Subscriber from the 
>>>> Vector, I couldn't find good method to do that.
>>>> Could you give me some advice?
>>>>
>>>> *server implementation*
>>>> *class EventPublisherImpl final : public EventPublisher::Server {*
>>>> * protected:*
>>>> *  ::kj::Promise<void> subscribe(SubscribeContext context) {*
>>>> *    cout << "subscribe request received" << endl;*
>>>> *    m_subscribers.add(context.getParams().getSubscriber());*
>>>> *    return kj::READY_NOW;*
>>>> *  }*
>>>>
>>>> *  ::kj::Promise<void> unsubscribe(UnsubscribeContext context) {*
>>>> *    cout << "unsubscribe request received" << endl;*
>>>> *    auto unsub = context.getParams().getSubscriber();*
>>>>
>>>> *    // I want to remove usub from subscribers like 
>>>> m_subscribers[unsub].erase();*
>>>> *    // But I couldn't find a method to compare*
>>>> *    // "EventPublisher::Subscriber::Client" such as == operator or 
>>>> public method*
>>>> *    // to distinguish the client.*
>>>> *    //*
>>>> *    // One solution is having an additional argument(id) for this 
>>>> purpose but*
>>>> *    // that requres additional management of ID.*
>>>> *    //  subscribe @0 (subscriber: Subscriber, id: Int32) -> (result: 
>>>> Int32);*
>>>> *    //  unsubscribe @1 (id: Int32) -> (result: Int32);*
>>>>
>>>> *    // what I can do is erase everything but this is not my goal*
>>>> *    m_subscribers.clear();*
>>>> *    return kj::READY_NOW;*
>>>> *  }*
>>>>
>>>> * private:*
>>>> *  kj::Vector<EventPublisher::Subscriber::Client> m_subscribers;*
>>>> *};*
>>>> Thank you,
>>>>
>>>> -- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "Cap'n Proto" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to capnproto+...@googlegroups.com.
>>>> To view this discussion on the web visit 
>>>> https://groups.google.com/d/msgid/capnproto/26c4964f-21a2-4fe2-a410-7673786d40c9n%40googlegroups.com
>>>>  
>>>> <https://groups.google.com/d/msgid/capnproto/26c4964f-21a2-4fe2-a410-7673786d40c9n%40googlegroups.com?utm_medium=email&utm_source=footer>
>>>> .
>>>>
>>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "Cap'n Proto" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to capnproto+...@googlegroups.com.
>>
> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/capnproto/9e88207c-8de2-4e98-a8a8-1ae424453279n%40googlegroups.com
>>  
>> <https://groups.google.com/d/msgid/capnproto/9e88207c-8de2-4e98-a8a8-1ae424453279n%40googlegroups.com?utm_medium=email&utm_source=footer>
>> .
>>
>

-- 
You received this message because you are subscribed to the Google Groups 
"Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to capnproto+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/capnproto/757a6cab-66cc-4657-ba81-37d7d5ae5efen%40googlegroups.com.

Reply via email to