[
https://issues.apache.org/jira/browse/FLINK-16319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17054441#comment-17054441
]
Dan Pettersson commented on FLINK-16319:
----------------------------------------
Hi again,
It's quite a big task to write the requirements-/specification for a pubsub
module in SF and as I prefer to write code instead I've written some new code
instead :-)
I have separated the responsibilities in this latest code and now the interface
has the responsibility to send the subscription to the
"topicPartitionsFunction" and when that function has registered the functionas
a subscriber it sends itself to the topicFunction as a subscriber.
Optimal would maybe be to block a publish call until is returns PublishAck to
the publish caller. In that way the ordering of events would be guaranteed.
With current implementation the published event is storedin FIFO order "Queue"
and are handled in order.
But as the pubsub execution needsto be async we to listen to
AsyncCompletionResult and make sure that no other publish object are published
before the first publish object in the queue are done with its distribution to
the subscribers.
Maybe we can use something similar to
backPressureValve.notifyAsyncOperationCompleted()
but with a new method that notifies only when ALL AsyncOperations are completed
to achievea "PublishedFinished" signal?
+FnPubSubBase+
+FnPubSubTopic+ +FnPubSubTopicPartion+
Holds the Addresses Holds the Addresses of the
of TopicPartions subscribing FunctionAdresses
When a subscribe occurs from Interface Subscriber it calls the
FnPubSubTopicPartion
that saves the functionaddress and then sends a subscribe.Ack to the parent
FnPubSubTopic function.
The same goes for unsubscribe. The difference in a "pubsub+*topic*+" addressId
and a "pubsub+*topicpartion*+" addressid is for example xxx.yyy.tradefor the
topicId and then the topicPartionId will be for example xxx.yyy.trade+*::1*+,
xxx.yyy.trade*+::73+* i.e the the topicId followed by "::" and the partionId.
As of now I'm using the PersistedTable with a LinkedHashMap as a tableAccessor
to guarantee the ordering of the publishing messages. I hope that reading the
code, with the above simple explanations, would make it "easy" to follow the
code so far...
Before writing any test I just want to make sure that this code is on the right
track when it comes to subscribing to topics at runtime. To eagerly register
topics subscription my current idea is to have a protected (empty method) that
is ("List<Topics> subscribeToTopics) as we have the context with us in
StatefulMatchFunction and we could register the subscription-/s in the
"ensureInitialized" in StatefulMatchFunction.
1) Move the pubsub module into SDK when ready. Problem is that I cant test the
pubsub code with the testutil module if the code is in the SDK as we then have
a circular dependency.
2) No current implementation on how to subscribe "eagerly" but maybe the "empty
method" solution in StatefulMatchFunction above might work?
3) Hardcoded "static final int PARTITIONS = 1000" which easily can be changed
with conf, but if this value is changed between deployments we need to keep
track of previous partition value and perform a rehashing of the subscriptions
via bootstrap?
As all my questions indicate :-) this is a prototype of how a pubsub impl.
could work in SF and I would be happy to work more on this but in the coming
weeks-/months but I need your input and ideas on how to make a pubsub module be
part of the SDK.
Please see
[https://github.com/danp11/flink-statefun/tree/master/statefun-pubsub] for the
current code and please let me know your thoughts when you have the time.
/Dan
> Pubsub-/Broadcast implementation
> --------------------------------
>
> Key: FLINK-16319
> URL: https://issues.apache.org/jira/browse/FLINK-16319
> Project: Flink
> Issue Type: New Feature
> Components: Stateful Functions
> Reporter: Dan Pettersson
> Priority: Major
>
> Hi everyone,
>
> I have a use case where the id of the functions are brokerId + instrumentId
> that receives trades and orders. The instrument has state changes (Open,
> halted, closed etc) that almost all the functions are interested in. Some
> functions only wants for example the Close message whereas other functions
> wants all state changes for the specific instrument.
>
> I've built a statefun pubsub module that exposes two interfaces, Subscriber
> and Publisher, with these two methods:
>
> default void subscribe(Context context, Subscription... subscriptions)
>
> default void publish(Context context, PublishMessage publishMessage)
>
> Behind the interfaces is a hidden StatefulPubSubFunction that keeps track of
> which partition the subscriber is located in and to which topic it listens to.
>
> Code is located under
> [https://github.com/danp11/flink-statefun/tree/master/statefun-pubsub] if
> anyone is interested.
>
> This code is a "classic pub sub" pattern and I think that this kind of
> functionality would be a great addition to Stateful functions. I create this
> Jira to see if there is an interest to discuss how a optimal
> pubsub-/broadcast solution would look like in SF? Igal has previously
> mentioned that Broadcast could be a good fit for this kind of flow.
> At the moment I don't know the internals of SF and-/or Flink good enough to
> come up with a proposal myself unfortunately.
>
> I know you are very busy at the moment (Its impressive how much you have
> produced only the last couple of weeks!:-) but if someone, on a high level,
> has any ideas on where and how a pub sub pattern could be implemented I'd
> really appreciate it. In the future I hope we can come up with a proposal
> together as I need your help here. If you think that a pubsub-/broadcast
> solution would make SF better that is :-)
>
> Hope to hear your thoughts on this!
>
> Thanks,
>
> /Dan
--
This message was sent by Atlassian Jira
(v8.3.4#803005)