[ 
https://issues.apache.org/jira/browse/FLINK-16319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17054441#comment-17054441
 ] 

Dan Pettersson commented on FLINK-16319:
----------------------------------------

Hi again,

It's quite a big task to write the requirements-/specification for a pubsub
module in SF and as I prefer to write code instead I've written some new code 
instead :-) 

I have separated the responsibilities in this latest code and now the interface 
has the responsibility to send the subscription to the 
"topicPartitionsFunction" and when that function has registered the functionas 
a subscriber it sends itself to the topicFunction as a subscriber.
 
Optimal would maybe be to block a publish call until is returns PublishAck to 
the publish caller. In that way the ordering of events would be guaranteed. 
With current implementation the published event is storedin FIFO order "Queue" 
and are handled in order.

But as the pubsub execution needsto be async we to listen to 
AsyncCompletionResult and make sure that no other publish object are published 
before the first publish object in the queue  are done with its distribution to 
the subscribers. 

Maybe we can use something similar to 
backPressureValve.notifyAsyncOperationCompleted()
but with a new method that notifies only when ALL AsyncOperations are completed 
to achievea "PublishedFinished" signal?


                      +FnPubSubBase+ 

+FnPubSubTopic+             +FnPubSubTopicPartion+
Holds the Addresses     Holds the Addresses of the  
of TopicPartions            subscribing FunctionAdresses


When a subscribe occurs from Interface Subscriber it calls the 
FnPubSubTopicPartion 
that saves the functionaddress and then sends a subscribe.Ack to the parent  
FnPubSubTopic function. 
 
The same goes for unsubscribe. The difference in a "pubsub+*topic*+" addressId 
and a "pubsub+*topicpartion*+" addressid is for example xxx.yyy.tradefor the 
topicId and then the topicPartionId will be for example xxx.yyy.trade+*::1*+, 
xxx.yyy.trade*+::73+* i.e the the topicId followed by "::" and the partionId.

As of now I'm using the PersistedTable with a LinkedHashMap as a tableAccessor 
to guarantee the ordering of the publishing messages. I hope that reading the 
code, with the above simple explanations, would make it "easy" to follow the 
code so far...

Before writing any test I just want to make sure that this code is on the right 
track when it comes to subscribing to topics at runtime. To eagerly register 
topics subscription my current idea is to have a protected (empty method) that 
is ("List<Topics> subscribeToTopics) as we have the context with us in 
StatefulMatchFunction and we could register the subscription-/s in the 
"ensureInitialized" in StatefulMatchFunction. 

1) Move the pubsub module into SDK when ready. Problem is that I cant test the 
pubsub code with the testutil module if the code is in the SDK as we then have 
a circular dependency.

2) No current implementation on how to subscribe "eagerly" but maybe the "empty 
method" solution in StatefulMatchFunction above might work?

3) Hardcoded "static final int PARTITIONS = 1000" which easily can be changed 
with conf, but if this value is changed between deployments we need to keep 
track of previous partition value and perform a rehashing of the subscriptions 
via bootstrap?

As all my questions indicate :-) this is a prototype of how a pubsub impl. 
could work in SF and I would be happy to work more on this but in the coming 
weeks-/months but I need your input and ideas on how to make a pubsub module be 
part of the SDK.

Please see  
[https://github.com/danp11/flink-statefun/tree/master/statefun-pubsub] for the 
current code and please let me know your thoughts when you have the time.
 
/Dan

> Pubsub-/Broadcast implementation
> --------------------------------
>
>                 Key: FLINK-16319
>                 URL: https://issues.apache.org/jira/browse/FLINK-16319
>             Project: Flink
>          Issue Type: New Feature
>          Components: Stateful Functions
>            Reporter: Dan Pettersson
>            Priority: Major
>
> Hi everyone,
>  
> I have a use case where the id of the functions are brokerId + instrumentId 
> that receives trades and orders. The instrument has state changes (Open, 
> halted, closed etc) that almost all the functions are interested in. Some 
> functions only wants for example the Close message whereas other functions 
> wants all state changes for the specific instrument. 
>  
> I've built a statefun pubsub module that exposes two interfaces, Subscriber 
> and Publisher, with these two methods:
>  
> default void subscribe(Context context, Subscription... subscriptions)
>   
> default void publish(Context context, PublishMessage publishMessage)
>  
> Behind the interfaces is a hidden StatefulPubSubFunction that keeps track of 
> which partition the subscriber is located in and to which topic it listens to.
>  
> Code is located under 
> [https://github.com/danp11/flink-statefun/tree/master/statefun-pubsub] if 
> anyone is interested.
>  
> This code is a "classic pub sub" pattern and I think that this kind of 
> functionality would be a great addition to Stateful functions. I create this 
> Jira to see if there is an interest to discuss how a optimal 
> pubsub-/broadcast solution would look like in SF? Igal has previously 
> mentioned that Broadcast could be a good fit for this kind of flow.
> At the moment I don't know the internals of SF and-/or Flink good enough to 
> come up with a proposal myself unfortunately.
>  
> I know you are very busy at the moment (Its impressive how much you have 
> produced only the last couple of weeks!:-) but if someone, on a high level, 
> has any ideas on where and how a pub sub pattern could be implemented I'd 
> really appreciate it. In the future I hope we can come up with a proposal 
> together as I need your help here.  If you think that a pubsub-/broadcast 
> solution would make SF better that is :-)
>  
> Hope to hear your thoughts on this!
>  
> Thanks,
>  
>  /Dan



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to