Hi Andrew,

I think your reasoning is correct. A way to implement the de-duping (if db
operations are not idempotent by themselves) is to save the sequence number
in the external db together with the writes. When starting up, before
replaying, you load the latest seq num from the db and then you know that
you can ignore all replay events with lower seq num.

/Patrik

On Wed, Apr 8, 2015 at 6:34 PM, Andrew Easter <[email protected]>
wrote:

> Okay, I've been reading more about PersistentView.
>
> I'm thinking I could achieve what I'm referring to here through the use of
> view snapshots?
>
> i.e.
>
> 1) Periodically save snapshots every N hours/minutes/seconds
> 2) Save snapshot on shutdown of the PersistentView so that when it's
> recreated, it only starts consuming from last processed seq num
>
> If the view writes to the db and, for whatever reason, crashes before
> persisting a snapshot that incorporates some handled seq nums, it's simply
> the case that updates to the db need to be idempotent, or some method of
> de-duping needs to be in place?
>
> On a related note, if the last seq number is relied upon, what happens in
> the case a write to the db fails? Of course, the write could be retried a
> few times, but, if it's still not successful following a max number of
> retries, I guess one would have to give up on it and log the error
> somewhere such that the problem is at least identified?
>
>
>
>
>
>
> On Wednesday, 8 April 2015 16:24:38 UTC+1, Andrew Easter wrote:
>>
>> Hi Patrik.
>>
>> Sorry to drag this up so long after it was posted, but I have a question
>> about it...
>>
>>
>>> Let's say we have a User aggregate root with some profile information
>>> that can be updated. The user is represented by a User
>>> EventsourcedProcessor actor, which is sharded. On the query side we want to
>>> be able to search users by first and last name, i.e. we want to store all
>>> users in a relational database table on the query side.
>>
>>
>>
>> The User actor persist FirstNameChanged, and inside the persist block it
>>> sends a Persistent(FirstNameChanged) message to the AllUsers Processor. On
>>> the query side we have a AllUsersView connected to that processor. When
>>> AllUsersView receives FirstNameChanged it updates the db table.
>>
>>
>>
>> To handle lost messages between User and AllUsers you might want to send
>>> an acknowledgement from AllUsers to User, and have a retry mechanism in
>>> User. I would implement that myself in User, but PersistentChannel might be
>>> an alternative.
>>
>>
>> I'm a little confused about how this solution avoids writing every event
>> from AllUsers to the db table on every recovery of AllUsersView. In your
>> original post, you contrasted this approach with another, in which you said:
>>
>> It must keep track of how far it has replayed/stored in db, i.e. seqNr
>>> must be stored in the db.
>>
>>
>> The implication is that the first approach (mentioned above) would _not_
>> need to keep track of the seq number in the db - i.e. that would only be
>> required in the second approach. However, I can't see how this would avoid,
>> during recovery of the AllUsersView, re-writing every event to the database
>> unless referring to a seq number stored in the db.
>>
>> Am I missing something?
>>
>> Thanks,
>> Andrew
>>
>>
>> On Thursday, 29 January 2015 08:30:09 UTC, Patrik Nordwall wrote:
>>>
>>>
>>>
>>> On Thu, Jan 22, 2015 at 9:01 PM, Yann Simon <[email protected]> wrote:
>>>
>>>> Hi Patrick,
>>>>
>>>> Le dimanche 20 avril 2014 16:59:22 UTC+2, Patrik Nordwall a écrit :
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Apr 20, 2014 at 2:47 PM, Olger Warnier <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Patrick,
>>>>>>
>>>>>> Sounds like an interesting approach, storing some meta-data at the
>>>>>> view may help to check / show the reliability of the system.
>>>>>>
>>>>>> At this moment the events are sent to a processor per node that
>>>>>> publishes the event (distributed pub sub)
>>>>>>
>>>>>
>>>>> That sounds good, as well.
>>>>>
>>>>>
>>>>>> When you talk about view, that's the akka-persistence view ?
>>>>>>
>>>>>
>>>>> Yes, persistence.View and persistence.Processor
>>>>>
>>>>>
>>>>>> So more or less, the sub processors could send messages to the View
>>>>>> and when there is a Persist() around it, it will be stored.
>>>>>>
>>>>>
>>>>> I'm not sure I understand what you mean here. Let me clarify my
>>>>> proposal with an example. Let's say we have a User aggregate root with 
>>>>> some
>>>>> profile information that can be updated. The user is represented by a User
>>>>> EventsourcedProcessor actor, which is sharded. On the query side we want 
>>>>> to
>>>>> be able to search users by first and last name, i.e. we want to store all
>>>>> users in a relational database table on the query side.
>>>>>
>>>>> The User actor persist FirstNameChanged, and inside the persist block
>>>>> it sends a Persistent(FirstNameChanged) message to the AllUsers Processor.
>>>>> On the query side we have a AllUsersView connected to that processor. When
>>>>> AllUsersView receives FirstNameChanged it updates the db table.
>>>>>
>>>>> To handle lost messages between User and AllUsers you might want to
>>>>> send an acknowledgement from AllUsers to User, and have a retry mechanism
>>>>> in User. I would implement that myself in User, but PersistentChannel 
>>>>> might
>>>>> be an alternative.
>>>>>
>>>>
>>>> Let's say the current version in production only have User actors.
>>>> Now we want to deliver an new version that include the new Query with
>>>> the AllUsers Processor.
>>>> How can we be sure that AllUsers receive all the events to be able to
>>>> construct its state?
>>>>
>>>
>>> I'm afraid there is no API to retrieve all ids, see feature request
>>> https://github.com/akka/akka/issues/13892
>>>
>>> For the moment I guess you have to try to retrieve them from the journal
>>> data store.
>>>
>>> Regards,
>>> Patrik
>>>
>>>
>>>>
>>>> Thanks in advance,
>>>> Yann
>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> Is that a correct understanding ?
>>>>>>
>>>>>> Kind regards,
>>>>>>
>>>>>> Olger
>>>>>>
>>>>>>
>>>>>> On Sunday, April 20, 2014 2:32:07 PM UTC+2, Patrik Nordwall wrote:
>>>>>>
>>>>>>> Hi Olger,
>>>>>>>
>>>>>>> What if you keep the sharded event sourced actors (+10k), but let
>>>>>>> them also send the events to one or a few processors. Then you can 
>>>>>>> connect
>>>>>>> the views/streams to these processors.
>>>>>>>
>>>>>>> If you don't like storing the events twice you can instead store
>>>>>>> some meta-data (processor-id, seq-no,timestamp) and have a view that
>>>>>>> creates sub-views on demand from the replayed meta-data. The sub-views
>>>>>>> would forward to the parent aggregated view.
>>>>>>>
>>>>>>> /Patrik
>>>>>>>
>>>>>>> 19 apr 2014 kl. 20:46 skrev Olger Warnier <[email protected]>:
>>>>>>>
>>>>>>>
>>>>>>> Hi Martin,
>>>>>>>
>>>>>>> Had to think about it a little, hereby my follow up. (hope you don't
>>>>>>> mind the continues discussion, it helps me a lot in defining the right
>>>>>>> approach, thanks for that)
>>>>>>>
>>>>>>> On Saturday, April 19, 2014 7:11:23 AM UTC+2, Martin Krasser wrote:
>>>>>>>>
>>>>>>>>  Hi Olger,
>>>>>>>>
>>>>>>>> installing 10k views/producers won't scale, at least not with the
>>>>>>>> current implementation. Here are some alternatives:
>>>>>>>>
>>>>>>> Intresting, what would need to change to have is scaling ?
>>>>>>> (Idea is to have the eventsourcedprocessors reflect a DDD style
>>>>>>> Aggregate Root instance and have those distributed using cluster 
>>>>>>> sharding)
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> - Maybe a custom journal plugin is what you need: a plugin that
>>>>>>>> delegates all write/read requests to the actual journal actor and that
>>>>>>>> additionally updates a database with the events to be written. This
>>>>>>>> essentially installs a single "listener" per ActorSystem (this is to 
>>>>>>>> some
>>>>>>>> extend comparable to a database trigger that executes additonal 
>>>>>>>> commands.
>>>>>>>> If the backend datastore supports that directly, I recommend 
>>>>>>>> implementing
>>>>>>>> the trigger there, if possible).
>>>>>>>>
>>>>>>>
>>>>>>> I am not sure, if I understand it.. the basic idea is to have the
>>>>>>> 'events' stored via the eventsourcedprocessor being published to 'n' 
>>>>>>> views.
>>>>>>> The actual number of view that need to listen to these events are not 
>>>>>>> known
>>>>>>> up front (people can add their own views... at system startup, it will 
>>>>>>> be
>>>>>>> clear)
>>>>>>> As every eventsourced actor is actually an AggregateRoot (in DDD
>>>>>>> terms) and thereby something of an instance with it's own state, the
>>>>>>> changes in these states need to be aggregated (that can be done with the
>>>>>>> streaming as you mention) and published to the views that are interested
>>>>>>> (subscribed).
>>>>>>> Doing this by hand in the aggregate root actor is not a problem,
>>>>>>> thereafter write your own listener actor and that will populate a view 
>>>>>>> data
>>>>>>> store. Still I have the feeling that the actual 'View' (or ViewProducer)
>>>>>>> could be implemented in such a way that it's done by the view.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> - Instead of having thousands of processors, what speaks against
>>>>>>>> combining them into a single processor (or only a few) per node?
>>>>>>>>
>>>>>>> This would mean that I'll have all my aggregate root instances
>>>>>>> running in 1 processor meaning that I need to reconstruct state per
>>>>>>> aggregate root instance in some way. Using EventsourcedProcessor, I'd
>>>>>>> expect that I need to replay everything for all instances and pick the 
>>>>>>> one
>>>>>>> that I need for processing at that moment. (this can of course be 
>>>>>>> optimized
>>>>>>> with snapshots and something like memcached). This appears to be a
>>>>>>> performance hit as I feel it.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> Further comments inline ...
>>>>>>>>
>>>>>>>> On 18.04.14 16:10, Olger Warnier wrote:
>>>>>>>>
>>>>>>>> Hi Martin,
>>>>>>>>
>>>>>>>>
>>>>>>>>  I'm currently working on view composition using the brand new
>>>>>>>>> akka-stream module. Basic idea is to make views stream producers and 
>>>>>>>>> to use
>>>>>>>>> the akka-stream DSL to merge message/event streams from several 
>>>>>>>>> producers
>>>>>>>>> into whatever you need. See also https://twitter.com/mrt1nz/sta
>>>>>>>>> tus/457120534111981569 for a first running example.
>>>>>>>>>
>>>>>>>>> WDYT?
>>>>>>>>>
>>>>>>>>
>>>>>>>>  First of all Nice stuff !, I think this is useful for the system
>>>>>>>> at my hands (real-time patient monitoring based on medical data)
>>>>>>>> I've seen the streams announcements but did not dive into that yet.
>>>>>>>> Looking at your code StreamExample.scala it more or less 'clicks' in
>>>>>>>> concept. (and hopefully in the right way)
>>>>>>>>
>>>>>>>>  From a 'View' perspective as currently is available in
>>>>>>>> akka-persistence, every producing actor needs a view attached to it in
>>>>>>>> order to push the events to the streams producer, right ? (when I look 
>>>>>>>> at
>>>>>>>> the ViewProducer.scala code, this is what is done.)
>>>>>>>>
>>>>>>>>  PersistentFlow.fromProcessor("p1").toProducer(materializer)
>>>>>>>>  Now, I have a sharding cluster with an EventsourcedProcessor  (expect
>>>>>>>> 10.000ths of these EventsourcedProcessor actor instances) , so I'll 
>>>>>>>> need to
>>>>>>>> create a line like this for every EventsourcedProcessor in order to 
>>>>>>>> get the
>>>>>>>> stream of events together. Thereafter, I need to merge them together 
>>>>>>>> to get
>>>>>>>> a single stream of events. (at least that is one of the features of 
>>>>>>>> using
>>>>>>>> the streams)
>>>>>>>>
>>>>>>>>
>>>>>>>> Every processor instance itself could create such a producer during
>>>>>>>> start and send it to another actor that merges received producers.
>>>>>>>>
>>>>>>> That would not allow me to implement 'View' (as is known in the
>>>>>>> persistence package) in order to listen to events within my cluster of
>>>>>>> aggregate root instances, I'll need to build something additional for 
>>>>>>> that
>>>>>>> (as View is more used for the collection of those events and thereafter
>>>>>>> will push them through)
>>>>>>>
>>>>>>> At this moment, I use an akka extension (gives more or less a
>>>>>>> singleton) that is used directly in the EventSourcedProcessor after 
>>>>>>> storage
>>>>>>> (persist) of the event.
>>>>>>> Thereafter I have listeners that get these events and transform them
>>>>>>> into data that needs storage for a certain type of view (CQRS style)  
>>>>>>> (this
>>>>>>> is where I expected the 'View' to be used)
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>  My goal is to have 'Listeners' (that is my interpretation of a
>>>>>>>> 'View' due to historic reasons...) that will for instance update a data
>>>>>>>> store, this will probably happen on on just a few nodes (maybe 1 and 
>>>>>>>> some
>>>>>>>> failover stuff). These 'Listeners' need to attach to the sharded
>>>>>>>> Eventsourced system and ask to get all event sourced events forwarded.
>>>>>>>> (publish subscribe more or less).
>>>>>>>>
>>>>>>>>  I wonder if the current View (or ViewProducer) fits this
>>>>>>>> situation due to the fact you need to create as many views as
>>>>>>>> eventsourcedprocessors are created.
>>>>>>>> With the merged streams thereafter, it seems a possibility to have
>>>>>>>> just one thing per node (I assume actor) that will do the writing to a 
>>>>>>>> data
>>>>>>>> store (not being the eventstore).
>>>>>>>> What would be the way to get these Views 'automagically' attached
>>>>>>>> to the proper procesors ?
>>>>>>>>
>>>>>>>>
>>>>>>>> See above.
>>>>>>>>
>>>>>>>>
>>>>>>>>  And, do you have a pointer how this issue is solved with it's own
>>>>>>>> eventstore ? In a sharding cluster, you more or less have the same 
>>>>>>>> issue.
>>>>>>>> (would streams change your approach there ?)
>>>>>>>>
>>>>>>>>
>>>>>>>> There's one journal actor per ActorSystem where n journal actors in
>>>>>>>> a cluster update a replicated journal
>>>>>>>> <http://akka.io/community/#plugins_to_akka_persistence>.
>>>>>>>>
>>>>>>> Thanks, thats not done as an akka extension right ? (why not ?)
>>>>>>>
>>>>>>>>
>>>>>>>> Hope that helps.
>>>>>>>>
>>>>>>> Certainly, really appreciate your patience.
>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Martin
>>>>>>>>
>>>>>>>
>>>>>>> Kind regards,
>>>>>>>
>>>>>>> Olger
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>>>>>> urrent/additional/faq.html
>>>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>>>>>> p/akka-user
>>>>>>> ---
>>>>>>> You received this message because you are subscribed to the Google
>>>>>>> Groups "Akka User List" group.
>>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>>> send an email to [email protected].
>>>>>>> To post to this group, send email to [email protected].
>>>>>>>
>>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>>
>>>>>>>  --
>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>>>>> urrent/additional/faq.html
>>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>>>>> p/akka-user
>>>>>> ---
>>>>>> You received this message because you are subscribed to the Google
>>>>>> Groups "Akka User List" group.
>>>>>> To unsubscribe from this group and stop receiving emails from it,
>>>>>> send an email to [email protected].
>>>>>> To post to this group, send email to [email protected].
>>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Patrik Nordwall
>>>>> Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
>>>>> Twitter: @patriknw
>>>>> JOIN US. REGISTER TODAY! <http://www.scaladays.org/>
>>>>> Scala <http://www.scaladays.org/>
>>>>> Days <http://www.scaladays.org/>
>>>>> June 16th-18th, <http://www.scaladays.org/>
>>>>> Berlin <http://www.scaladays.org/>
>>>>>
>>>>>   --
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>>> current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>>> group/akka-user
>>>> ---
>>>> You received this message because you are subscribed to the Google
>>>> Groups "Akka User List" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to [email protected].
>>>> To post to this group, send email to [email protected].
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>>>
>>> --
>>>
>>> Patrik Nordwall
>>> Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
>>> Twitter: @patriknw
>>>
>>>   --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
Twitter: @patriknw

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to