Hi Magnus, Thanks for your extended answer. Currently, I've solved it with application code and this thread and the thread before was about finding ways to support more use cases with akka persistence. The DDD Aggregate Root is one of them and that needs quite some plumbing to get it going. So with these discussions we've found a solution to 'tag' the stream of events and make use of that tag to replay to the views. (assuming that this is the correct interpretation of the conclusions)
I expected some work before the end of this year as that would be my production moment (gathering real time data of hospitals and running an algorithm over that) and this enhancements would remove a piece of plumbing that I'd prefer not to see there as it makes things hard to understand. In my current setup, the instances of an aggregate root are created via cluster sharding with all their unique ID as a perstent actor. Thereafter I need to aggregate the changes of all these ID's of the same type of aggregate root to a single view. Now I have a persistent actor that has the persistenceId of the type and just tells that a specific instance has changed. That allows me to trigger a view for that instance. So, Id like to be able to generate a view based on the type of aggregate root, having separate instances as persistent actors running and the 'tag' mechanism would allow me to. Now I am looking into a way to build this support in, in a non-intrusive way (e.g. that it will not break what is there and won't change too much of the current structure) I'd be concerned with the order of messages but not especially with the sequence number (expecting that the order will be OK) Writing the tag (aggregateId) along with the messages to persist is not much of an issue. (although the sequence numbering is per persistenceId) Replaying is a bit more of an issue as the current view and actor are both based on the Eventsourced class that is tightly coupled to write and read based on the persistenceId. That is fine for the actor (re-construct the single instance) but not for the view (give me all persistence events for this aggregate root type (tag). Kind regards, Olger On Saturday, April 25, 2015 at 4:10:08 PM UTC+2, Magnus Andersson wrote: > > Greg, I agree with you there. I do not disagree with convenience. :) > > But where are different kinds of convenience: > > As a devops person I want to minimize shared state in order to have pieces > of software that can fail and start independently. I want to sleep at night > and have free weekends. > > So the questions I hid in my text was: > What is the motivation to solve your problem in akka-persistence instead > of in your application code? > > Magnus > Den 25 apr 2015 1:57 em skrev "Greg Young" <[email protected] > <javascript:>>: > >> I as a developer want assured ordering across streams because my system >> does 200 events per second and linearization is a simpler model. >> >> On Saturday, April 25, 2015, Magnus Andersson <[email protected] >> <javascript:>> wrote: >> >>> Hi >>> >>> From your question it looks like you want to build up a persistent view >>> by merging journal streams using multiple persistence ids. That is a common >>> use case and my experience is that is is a bit cumbersome, but doable today. >>> >>> However you want strict replay ordering over multiple persistent actors. >>> If you have a requirement of strict ordering across aggregate roots it >>> sounds like a design flaw in your application, are you perhaps dividing up >>> you domain too granularly? >>> >>> My view your persistent actors should be your aggregate roots, period. >>> Your persistent actor can of course can have an eventual consistency >>> dependency to other actors, for deciding logic or validating input before >>> persisting >>> >>> That being said, for views there are often the need for merging streams >>> of events from multiple journals to build up an aggregated view. But if >>> your persistent actors are aggregate roots then it does not make sense that >>> the view would have any guarantee of the ordering. Events are things that >>> happened in the past so you don't need to validate them after the fact. >>> >>> Other types of ordering seems more like application specific problems. >>> Here are some suggestions: >>> >>> 1. *First come first serve ordering:* Setup an view aggregation >>> actor that is fed events from multiple journal sources. Your aggregate >>> actor is a persistent actor and will persist each messages in the >>> sequence >>> they arrive. You now have strict ordering in your aggregate actor and >>> replays will guaranteed to be in the same order the events arrived. Of >>> course this uses up extra storage and you need to keep track any >>> implicit >>> dependencies if you were to create multiple levels of these. >>> 2. *Timebased ordering:* If it makes sense in your application and >>> you trust the clock on your servers, you can relax your requirements and >>> include a persist timestamp in your message when journaling. When you >>> replay messages from two sources (persistent views) you can merges >>> events >>> into event stream buffer that sorts events based on the persist >>> timestamp >>> before emitting messages. >>> 3. *Shared sequence ordering: *basically your original idéa combined >>> with event stream buffer. You include an extra field which has sequence >>> numbers fed from your sequence source. Then replay into a journal stream >>> buffer that makes sure events are emitted in correct order. >>> >>> If you are thinking about a shared source for sequential ids, >>> then Twitter had something called snowflake >>> <https://github.com/twitter/snowflake> (written in Scala). The project >>> is deprecated now but the history and code is there. >>> >>> /Magnus >>> >>> Den fredag 24 april 2015 kl. 16:43:46 UTC+2 skrev Olger Warnier: >>>> >>>> Well, >>>> >>>> I found that the sequence numbers are actually generated on a per >>>> persistent actor instance basis. >>>> So that makes replay for a single aggregateId based with limits on the >>>> sequence numbers a bit of an intresting challenge >>>> >>>> Still interested in your opinions as that will have impact on the way >>>> to solve this (some kind of atomic sequence generator shared between >>>> aggregates ?) >>>> >>>> >>>> On Friday, April 24, 2015 at 10:42:04 AM UTC+2, Olger Warnier wrote: >>>>> >>>>> Hi Roland / List, >>>>> >>>>> I am looking into an addition/mutation to the Persistency layer that >>>>> allows storage of an aggregateId (more or less the whole 'tag' idea >>>>> without >>>>> being able to have multiple tags to start out with) with a replay (for >>>>> the >>>>> view) based on that aggregateId. (bit like the DDD AggregateRoot) >>>>> >>>>> Replay is started with a message that contains a start sequence and >>>>> assumes (logically) that the sequence will go up. >>>>> With regards to the aggregateId, replay is for all persistenceIds that >>>>> have registered this aggregateId. >>>>> >>>>> If you wish to allow replay on aggregate level, the sequenceId >>>>> (numbering) should be on aggregate level with as side effect that the >>>>> sequence numbering on persistenceId level will go up but with 'gaps'. >>>>> >>>>> When you are not dependent on a gapless series of persistence events, >>>>> that won't be an issue (just keep the last processed persistenceId >>>>> sequence >>>>> number for your snapshot, and it will still work) >>>>> >>>>> Any opinion on this ? >>>>> Somebody have a use case that requires gapless persistenceId sequence >>>>> numbers ? >>>>> >>>>> Kind regards, >>>>> >>>>> Olger >>>>> >>>>> >>>>> >>>>> On Friday, March 27, 2015 at 1:33:43 PM UTC+1, rkuhn wrote: >>>>>> >>>>>> Hi Murali, >>>>>> >>>>>> the core team at Typesafe cannot work on this right now (we need to >>>>>> finish Streams and HTTP first and have some other obligations as well), >>>>>> but >>>>>> Akka is an open-source project and we very much welcome contributions of >>>>>> all kinds. In this case we should probably start by defining more >>>>>> closely >>>>>> which queries to (initially) support and how to model them in the >>>>>> various >>>>>> backends, so that we can get a feel for how we shall change the Journal >>>>>> SPI. >>>>>> >>>>>> Regards, >>>>>> >>>>>> Roland >>>>>> >>>>>> 27 mar 2015 kl. 12:41 skrev Ganta Murali Krishna <[email protected]>: >>>>>> >>>>>> Hello Roland, >>>>>> >>>>>> Any news on this please. When we can expect implementation roughly? >>>>>> Your response will be really appreciated. >>>>>> >>>>>> Regards >>>>>> Murali >>>>>> >>>>>> On Wednesday, 27 August 2014 20:04:30 UTC+5:30, rkuhn wrote: >>>>>>> >>>>>>> Dear hakkers, >>>>>>> >>>>>>> there have been several very interesting, educational and productive >>>>>>> threads in the past weeks (e.g. here >>>>>>> <https://groups.google.com/d/msg/akka-user/SL5vEVW7aTo/KfqAXAmzol0J> >>>>>>> and here >>>>>>> <https://groups.google.com/d/msg/akka-user/4kbYcwWS2OI/hpmAkxnB9D4J>). >>>>>>> We have taken some time to distill the essential problems as well as >>>>>>> discuss the proposed solutions and below is my attempt at a summary. In >>>>>>> the >>>>>>> very likely case that I missed something, by all means please raise >>>>>>> your >>>>>>> voice. The intention for this thread is to end with a set of github >>>>>>> issues >>>>>>> for making Akka Persistence as closely aligned with CQRS/ES principles >>>>>>> as >>>>>>> we can make it. >>>>>>> >>>>>>> As Greg and others have confirmed, the write-side (PersistentActor) >>>>>>> is already doing a very good job, so we do not see a need to change >>>>>>> anything at this point. My earlier proposal of adding specific topics >>>>>>> as >>>>>>> well as the discussed labels or tags all feel a bit wrong since they >>>>>>> benefit only the read-side and should therefore not be a concern/duty >>>>>>> of >>>>>>> the write-side. >>>>>>> >>>>>>> On the read-side we came to the conclusion that PersistentView >>>>>>> basically does nearly the right thing, but it focuses on the wrong >>>>>>> aspect: >>>>>>> it seems most suited to track a single PersistentActor with some slack, >>>>>>> but >>>>>>> also not with back-pressure as a first-class citizen (it is possible to >>>>>>> achieve it, albeit not trivial). What we distilled as the core >>>>>>> functionality for a read-side actor is the following: >>>>>>> >>>>>>> >>>>>>> - it can ask for a certain set of events >>>>>>> - it consumes the resulting event stream on its own schedule >>>>>>> - it can be stateful and persistent on its own >>>>>>> >>>>>>> >>>>>>> This does not preclude populating e.g. a graph database or a SQL >>>>>>> store directly from the journal back-end via Spark, but we do see the >>>>>>> need >>>>>>> to allow Akka Actors to be used to implement such a projection. >>>>>>> >>>>>>> Starting from the bottom up, allowing the read-side to be a >>>>>>> PersistentActor in itself means that receiving Events should not >>>>>>> require a >>>>>>> mixin trait like PersistentView. The next bullet point means that the >>>>>>> Event >>>>>>> stream must be properly back-pressured, and we have a technology under >>>>>>> development that is predestined for such an endeavor: Akka Streams. So >>>>>>> the >>>>>>> proposal is that any Actor can obtain the ActorRef for a given Journal >>>>>>> and >>>>>>> send it a request for the event stream it wants, and in response it >>>>>>> will >>>>>>> get a message containing a stream (i.e. Flow) of events and some >>>>>>> meta-information to go with it. >>>>>>> >>>>>>> The question that remains at this point is what exactly it means to >>>>>>> “ask for a certain set of events”. In order to keep the number of >>>>>>> abstractions minimal, the first use-case for this feature is the >>>>>>> recovery >>>>>>> of a PersistentActor. Each Journal will probably support different >>>>>>> kinds of >>>>>>> queries, but it must for this use-case respond to >>>>>>> >>>>>>> case class QueryByPersistenceId(id: String, fromSeqNr: Long, >>>>>>> toSeqNr: Long) >>>>>>> >>>>>>> with something like >>>>>>> >>>>>>> case class EventStreamOffer(metadata: Metadata, stream: >>>>>>> Flow[PersistentMsg]) >>>>>>> >>>>>>> The metadata allows the recipient to correlate this offer with the >>>>>>> corresponding request and it contains other information as we will see >>>>>>> in >>>>>>> the following. >>>>>>> >>>>>>> Another way to ask for events was discussed as Topics or Labels or >>>>>>> Tags in the previous threads, and the idea was that the generated >>>>>>> stream of >>>>>>> all events was enriched by qualifiers that allow the Journal to >>>>>>> construct a >>>>>>> materialized view (e.g. a separate queue that copies all events of a >>>>>>> given >>>>>>> type). This view then has a name that is requested from the read-side >>>>>>> in >>>>>>> order to e.g. have an Actor that keeps track of certain aspects of all >>>>>>> persistent ShoppingCarts in a retail application. As I said above we >>>>>>> think >>>>>>> that this concern should be handled outside of the write-side because >>>>>>> logically it does not belong there. Its closest cousin is the >>>>>>> construction >>>>>>> of an additional index or view within a SQL store, maintained by the >>>>>>> RDBMS >>>>>>> upon request from the DBA, but available to and relied upon by the >>>>>>> read-side. We propose that this is also how this should work with Akka >>>>>>> Persistence: the Journal is free to allow the configuration of >>>>>>> materialized >>>>>>> views that can be requested as event streams by name. The extraction of >>>>>>> the >>>>>>> indexing characteristics is performed by the Journal or its backing >>>>>>> store, >>>>>>> outside the scope of the Journal SPI; one example of doing it this way >>>>>>> has >>>>>>> been implemented by Martin >>>>>>> <https://github.com/krasserm/akka-persistence-kafka/#user-defined-topics> >>>>>>> already. >>>>>>> We propose to access the auxiliary streams by something like >>>>>>> >>>>>>> case class QueryKafkaTopic(name: String, fromSeqNr: Long, toSeqNr: >>>>>>> Long) >>>>>>> >>>>>>> Sequence numbers are necessary for deterministic replay/consumption. >>>>>>> We had long discussions about the scalability implications, which is >>>>>>> the >>>>>>> reason why we propose to leave such queries proprietary to the Journal >>>>>>> backend. Assuming a perfectly scalable (but then of course not >>>>>>> real-time >>>>>>> linearizable) Journal, the query might allow only >>>>>>> >>>>>>> case class QuerySuperscalableTopic(name: String, fromTime: DateTime) >>>>>>> >>>>>>> This will try to give you all events that were recorded after the >>>>>>> given moment, but replay will not be deterministic, there will not be >>>>>>> unique sequence numbers. These properties will be reflected in the >>>>>>> Metadata >>>>>>> that comes with the EventStreamOffer. >>>>>>> >>>>>>> The last way to ask for events is to select them using an >>>>>>> arbitrarily powerful query at runtime, probably with dynamic parameters >>>>>>> so >>>>>>> that it cannot be prepared or materialized while writing the log. >>>>>>> Whether >>>>>>> and how this is supported by the Journal depends on the precise >>>>>>> back-end, >>>>>>> and this is very much deliberate: we want to allow the Journal >>>>>>> implementations to focus on different use-cases and offer different >>>>>>> feature >>>>>>> trade-offs. If a RDBMS is used, then things will naturally be >>>>>>> linearized, >>>>>>> but less scalable, for example. Document databases can extract a >>>>>>> different >>>>>>> set of features than when storing BLOBs in Oracle, etc. The user-facing >>>>>>> API >>>>>>> would be defined by each Journal implementation and could include >>>>>>> >>>>>>> case class QueryEventStoreJS(javascriptCode: String) >>>>>>> case class QueryByProperty(jsonKey: String, value: String, since: >>>>>>> DateTime) >>>>>>> case class QueryByType(clazz: Class[_], fromSeqNr: Long, toSeqNr: >>>>>>> Long) >>>>>>> case class QueryNewStreams(fromSeqNr: Long, toSeqNr: Long) >>>>>>> >>>>>>> The last one should elegantly solve the use-case of wanting to >>>>>>> catalog which persistenceIds are valid in the Journal (which has been >>>>>>> requested several times as well). As discussed for the >>>>>>> SuperscalableTopic, >>>>>>> each Journal would be free to decide whether it wants to implement >>>>>>> deterministic replay, etc. >>>>>>> >>>>>>> Properly modeling streams of events as Akka Streams feels like a >>>>>>> consistent way forward, it also allows non-actor code to be employed >>>>>>> for >>>>>>> doing stream processing on the resulting event streams, including >>>>>>> merging >>>>>>> multiple of them or feeding events into Spark—the possibilities are >>>>>>> boundless. I’m quite excited by this new perspective and look forward >>>>>>> to >>>>>>> your feedback on how well this helps Akka users implement the Q in CQRS. >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> >>>>>>> *Dr. Roland Kuhn* >>>>>>> *Akka Tech Lead* >>>>>>> Typesafe <http://typesafe.com/> – Reactive apps on the JVM. >>>>>>> twitter: @rolandkuhn >>>>>>> <http://twitter.com/#!/rolandkuhn> >>>>>>> >>>>>>> >>>>>> -- >>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>> >>>>>>>>>> Check the FAQ: >>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>> >>>>>>>>>> Search the archives: >>>>>> https://groups.google.com/group/akka-user >>>>>> --- >>>>>> You received this message because you are subscribed to the Google >>>>>> Groups "Akka User List" group. >>>>>> To unsubscribe from this group and stop receiving emails from it, >>>>>> send an email to [email protected]. >>>>>> To post to this group, send email to [email protected]. >>>>>> Visit this group at http://groups.google.com/group/akka-user. >>>>>> For more options, visit https://groups.google.com/d/optout. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> *Dr. Roland Kuhn* >>>>>> *Akka Tech Lead* >>>>>> Typesafe <http://typesafe.com/> – Reactive apps on the JVM. >>>>>> twitter: @rolandkuhn >>>>>> <http://twitter.com/#!/rolandkuhn> >>>>>> >>>>>> >> >> -- >> Studying for the Turing test >> >> -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
