On 26.02.14 14:09, delasoul wrote:
Hello Martin,
thanks for your detailed answer, pls. find more questions/remarks inline:
On Wednesday, 26 February 2014 08:27:26 UTC+1, Martin Krasser wrote:
Hi Michael,
with akka-persistence you have several options to maintain
persistent read models:
1.) processor -> channel -> destination processor. The destination
processor itself writes messages (received via channel) to the
journal.
2.) eventsourced processor -> (target) processor. The eventsourced
processor sends events to the (target) processor from within its
command handler. When using reliable event delivery
<http://doc.akka.io/docs/akka/2.3.0-RC4/scala/persistence.html#reliable-event-delivery>,
this is more or less equivalent to 1 (from the read model
perspective).
3.) processor or eventsourced processor | view. Here, the view
reads directly from a processor's journal. Processor or
eventsourced processor don't send any messages directly to view.
Mixing 3 with other options is not a good idea, as you already
mentioned. What are the differences between these three options?
ref 1.) at-least-once delivery semantics but destination processor
must be able to deal with duplicates as well as out-of-order
message delivery. Depends on the application-specific processing
logic if this is an issue.
ref 2.) at-most-once delivery semantics i.e. (target) processor
must assume that message can be lost. With realiable event
delivery, see ref 1.
ref 3.) at-least-once delivery semantics i.e. failed reads (=
incremental replays) from a processor's journal are retried by the
view but a view can make ordering assumptions here: if it
maintains a lastProcessedSequenceNr field, it can safely ignore
any messages with a sequence number <= lastProcessorSequenceNr.
Read models for which message ordering matters should be
implemented as views.
Thanks for this summary which confirms my understanding of the
documentation. Regarding option2: this would mean that the same event
is persisted twice(by the eventsourced processor and the receiving
processor)?
yes
Also, from the documentation I understand that when using reliable
messaging via channels, emitting events should be done in the
eventsourced processor's eventhandler, as message resending possibly
needs also be done when replaying?(the channel checks for message
confirmation but does not resend not confirmed messages automatically
after a restart),
Using a channel from within an event handler can re-deliver unconfirmed
messages but only to the extend that the JVM (containing the channel and
the sending processor) doesn't crash. If the JVM crashes and the
application is restarted, unconfirmed messages will not be re-delivered.
To achieve re-delivery after JVM crashes you need to use the channel
also from within receiveRecover (see reliable event delivery
<http://doc.akka.io/docs/akka/2.3.0-RC4/scala/persistence.html#reliable-event-delivery>).
The event handler itself is not invoked again during recovery.
more inline
More inline ...
On 25.02.14 19:25, delasoul wrote:
Hello,
following the improvement suggestions in the latest cluster
sharding activator example I have added a persistent view and a
replicated journal using
https://github.com/ddevore/akka-persistence-mongo/
<https://github.com/ddevore/akka-persistence-mongo/>.
As usual everything works as announced and I think
persistence/cluster sharding is really great and we'll be using
it a lot in the future.
Now, I am not too sure how to best use persistent views:
I have added an AuthorListingView (pls. see code at the end of
this post). The view is also sharded and I have deactivated auto
updates as I think in most cases I will not find the correct
update interval - either the update is tried too often without
any changes happened or the delay between a change happening and
updating the view would be too long.
So, I create the AuthorListing with the view's shardRegion:
ClusterSharding(system).start(
typeName = AuthorListing.shardName,
entryProps =
Some(AuthorListing.props(ClusterSharding(system).shardRegion(AuthorListingView.shardName)))
and in AuthorListing when it gets a PostSummary msg I update the
corresponding view "manually" via the shardRegion:
def receive = {
case Persistent(s: PostSummary, _) =>
posts :+= s
log.info <http://log.info>("Post added to {}'s list: {}",
s.author, s.title)
view ! ListingChanged(s.author)
Forgetting about the duplicate sending of DoUpdate msgs when
replaying ( I think Views will be mostly used with EventSourced
processors anyway) this works
but it still feels not correct somehow - I just could send the
view the persisted msg (command or event) directly without the
Update, saving a journal query.
Here you you mix option 3 (see above) with another one.
Furthermore, views never write to a journal.
The ListingChanged event is just a wrapper for the cluster sharding
so it can route based on the author, the ListingChanged event is never
received by the view:
case ListingChanged(author) => (author, Update(await = true))
I send an Update msg and the view has to query the journal for the
persisted msg to update its state (and as I am sending a msg anyway...)
Also, it is probably not a good solution when the same event has to
be sent to more than one view (using auto update would of course fix
this problem...).
You can always use a view (option 3) or a destination/target processor
(options 1 and 2) that distribute their events to plain actors without
using channels. This way you save redundant journal updates or queries.
Would it be a good idea to update a view's state by just sending
"normal" events(and never use auto-update or Update msgs), but of
course use the replay and snapshot functionality of a view for
starting/restarting?
No, see initial comments.
When updating Views by accessing the journal and not using
"messaging" will the journal not become a bottleneck at a certain point?
Most replicated journals should be able to scale reads well
(preferrable with a configurable read consistency). For actor systems
on nodes that only maintain read models, one could also develop a
journal plugin that is optimized for very fast, scalable reads (e.g.
reading from a datastore that receives updates from the datastore that
accepts writes from processors).
we were thinking about playing with tailable cursors/capped
collections(we are using mongo), but had no time until now and these
come with certain limitations(fixed size, no sharding support) so
maybe not the best fit...
In the moment (we are using eventsourced) we do it the classical way
- command side persisting to the event store and updating the view
side by emitting events and the view persists the view model again.
This is option 1 above (unless you mean that the view directly writes
its current state to a database, for example)
Yes, that's how it is in the moment - the view is not a processor
and persists its state to a db.
But I really like the idea to also keep the view in memory when
needed, update it, shut it down when not used and bring it back
easily by replaying the corresponding processor or snapshotting
the view.
This is possible with all options 1-3 described above.
The combination of an eventsourced processor and a view would be
my preference, but as i said here I am struggling with the update
policy. In certain use cases it will be completely sufficient to
create and replay the view just on demand, but if keeping it in memory
it has to get updated
Depends on what inconsistency window your application is willing to
accept. You still can read any time from the view with eventual
consistency guarantees. Update(await = true) is only needed to achieve
strong read consistency. Furthermore, view updates are only incremental
updates (not full replays) so it may well be that in many cases they're
not too expensive.
A later useful extension to akka-persistence could be push-based views
(in addition to the current pull-based views). They are harder to
implement as they require distributed pub/sub that preserves message order.
- maybe option 1 is than better because the sent event transports the
needed data, but on the other hand there is the"manual" view Update
message?
Hope that helps,
Helps a lot, thanks again,
michael
Cheers,
Martin
I think I forgot all other things I additionally wanted to ask by
now...
Thanks in advance for any suggestions,
michael
object AuthorListingView {
case class GetPosts(author: String)
case class Posts(list: immutable.IndexedSeq[PostSummary])
case class ListingChanged(author: String)
val idExtractor: ShardRegion.IdExtractor = {
case p @ Persistent(s: PostSummary, _) => (s.author, p)
case ListingChanged(author) => (author, Update(await = true))
case m: GetPosts => (m.author, m)
}
val shardResolver: ShardRegion.ShardResolver = msg => msg match {
case Persistent(s: PostSummary, _) =>
(math.abs(s.author.hashCode) % 100).toString
case ListingChanged(author) => (math.abs(author.hashCode) %
100).toString
case GetPosts(author) => (math.abs(author.hashCode) %
100).toString }
val shardName: String = "AuthorListingView"
}
class AuthorListingView extends View with ActorLogging {
import AuthorListingView._
log.info <http://log.info>(s"viewId: $viewId and processorId:
$processorId")
override def processorId: String =
this.viewId.replace(shardName, AuthorListing.shardName)
override def autoUpdate = false
var posts = Vector.empty[PostSummary]
override def receive: Receive = {
case Persistent(s: PostSummary, _) =>
log.info <http://log.info>(s"View for ${self.path.name
<http://self.path.name>} received summary: $s")
posts :+= s
case GetPosts(_) => sender ! Posts(posts)
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives:
https://groups.google.com/group/akka-user
<https://groups.google.com/group/akka-user>
---
You received this message because you are subscribed to the
Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it,
send an email to [email protected] <javascript:>.
To post to this group, send email to [email protected]
<javascript:>.
Visit this group at http://groups.google.com/group/akka-user
<http://groups.google.com/group/akka-user>.
For more options, visit https://groups.google.com/groups/opt_out
<https://groups.google.com/groups/opt_out>.
--
Martin Krasser
blog:http://krasserm.blogspot.com
code:http://github.com/krasserm
twitter:http://twitter.com/mrt1nz
--
Martin Krasser
blog: http://krasserm.blogspot.com
code: http://github.com/krasserm
twitter: http://twitter.com/mrt1nz
--
Read the docs: http://akka.io/docs/
Check the FAQ: http://akka.io/faq/
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.