On Thursday, November 19, 2015 at 12:32:38 PM UTC-8, Patrik Nordwall wrote: > > > > On Thu, Nov 19, 2015 at 8:06 PM, Jim Hazen <[email protected] > <javascript:>> wrote: > >> Wait, what? So cluster sharding depends on shared mutable state across >> your cluster to work? AFAIknew the independent local nodes managed their >> state internally, communicated/coordinated via network protocolling and >> delegated to a master when it needed to determine a shard owner the first >> time. All of this allowing for local state, mutated locally with >> discovered cluster information. Is this not the case? If so, why, this >> seems contra to the Akka/Actor model and many other clustering strategies. >> > > That is correct, but the decisions taken by the coordinator must be > consistent, also when the coordinator crashes and fails over to another > node. Therefore, the state of the new coordinator instance must be > recovered to the exact same state as the previous coordinator instance. By > default the coordinator is using Akka Persistence to store and recover this > state. Distributed Data can be used as an alternative for storing this > state. > > The coordinator could broadcast or gossip its state to the cluster as it builds it. Or since the state the coordinator is really managing is the node owner of a particular shard, it could simply request this information from live cluster participants as part of coordinator election (or rather immediately after). This should rebuild an accurate shard topology. Coordination then proceeds normally.
> >> My product is using a distributed journal, since we also use persistence >> along with cluster sharding. However we're plagued with clustering issues >> when rolling new code and ripple restarting nodes in the cluster. I was >> hoping this would go away in 2.4 when I could go back to a local journal >> for sharding state. >> > > That will not work, becuase when the coordinator fails over to another > node it will not have access to the leveldb journal used by the previous > coordinator, and it will then recover to wrong state and you will end up > with entity actors with same id running on several nodes. Not an option. > Are you talking about duplicate IDs within the new active cluster, or across both brains in case of a split? Dupes within a cluster can be prevented from happening if you rebuild the coordinator from the remaining nodes. Once the new coordinator takes over, it could easily broadcast a new authoritative ownership table to each node. If the node notices it had running IDs for shards it no longer owns, it can shut them down. This shouldn't happen much if the coordinator built its state from the active nodes in the first place. In the case of a split, the active brain doesn't have much control over the IDs on the other side. I needs to rely on a resolution strategy shared by both sides to ensure consistency (which is why you have the split brain resolver). > > >> The assumption being that an inter-cluster network partition was >> corrupting the shared state on the distributed journal. Currently the only >> way to recover my cluster in these situations is to shut down all nodes, >> remove all shard entries from dynamo and restart the cluster nodes, 1 by >> 1. This is on akka 2.3.10. >> > > We have seen two reasons for this issue. > 1) Bugs in the journals, e.g. replaying events in wrong order. > 2) Split brain scenarios (including network partitions, long GC, and > system overload) causing split of the cluster into two separate clusters > when using auto-downing. That will result in two coordinators (one in each > cluster) writing to the same database and thereby making the event sequence > corrupt. We recommend manual downing or Split Brain Resolver > <http://www.google.com/url?q=http%3A%2F%2Fdoc.akka.io%2Fdocs%2Fakka%2Frp-15v09p02%2Fscala%2Fsplit-brain-resolver.html&sa=D&sntz=1&usg=AFQjCNEASVmdwI1fAGRPkHdMwFR92lZYhQ> > > instead of auto-downing. > > Splits are inevitable and this implementation feels like it amplifies the problem. Once the inevitable happens you corrupt your state because the OSS cluster sharding impl lacks a Split Brain Resolver. What's worse is that this situation doesn't appear to be detected immediately. The split silently continues with both coordinators blindly writing state and sharding getting more and more corrupt. But you don't know how bad things are until you attempt to recover shard state from the journal, then blamo, you end up with no brains. It would be nice if the OSS product at least supported the static-quorum resolver. Or honored the akka.cluster.min-nr-of-members value when cluster membership changes. It would appear that this value isn't checked if the members are already in the UP state and the cluster is simply electing a new coordinator. I was thinking of listening for membership changes myself and doing that within my app. Another advantage of getting shard state out of your shared journal, is the ability to point multiple clusters to that journal. The data keys within that journal can be hashed so as not to collide between clusters. I have use cases for this as well. However doing this today would in effect intentionally cause a split brain. One that I'm not sure either cluster would immediately detect since their configs and nodes would be independent. Maybe it could be done in 2.4 by picking different shared journals for shard storage and the same for data storage. Dunno, not on 2.4 yet. Manual downing isn't really an option either. I can't afford for access to keys on unreachable nodes to hang until a person can fix it, rather the system needs to quickly relocate the actor somewhere (but is free to hand back if the original host becomes reachable again). I can invest in using convergent structures for my data, but even if my data were convergent and tolerant of some split brain issues, I'd still have problems with my akka cluster not starting after a node restart. At least not without some extra work with the OSS product. Don't get me wrong, I absolutely love Akka and all the work that has gone into Cluster Sharding. But now that Cluster Sharding is a fully supported module it would be nice if it had at least one out of the box way to deal with split brain issues. Thanks, Jim PS: I'll also look into Distributed Data, and get on the latest 3.x version. PPS: Ok, looked into ddata. Sounds like what I might be looking for. I assume that if all nodes go down we're back to square 1? Although if all nodes are down, you are back to square one. Except if you wanted to use rememberEntities and benefit from local snapshot data. In this case building up global state from a collection of persisted local states still appears to have benefits. This would be done as nodes reform the cluster and before the cluster min size or prior quora had been met (cluster metadata could be saved by each node and a consensus built prior to final UPing of nodes). By the way, use latest version, i.e. 2.3.14 or 2.4.0. > > Regards, > Patrik > > >> >> java.lang.IllegalArgumentException: requirement failed: Region >> Actor[akka://User/user/sharding/TokenOAuthState#1273636297] not registered: >> State(Map(-63 -> >> Actor[akka://User/user/sharding/TokenOAuthState#-1778736418], 23 -> >> Actor[akka://User/user/sharding/TokenOAuthState#-1778736418], 40 -> >> Actor[akka.tcp:// >> [email protected]:8115/user/sharding/TokenOAuthState#1939420601], 33 -> >> Actor[akka.tcp:// >> [email protected]:8115/user/sharding/TokenOAuthState#-1679280759], 50 -> >> Actor[akka://User/user/sharding/TokenOAuthState#-1778736418], -58 -> >> Actor[akka.tcp:// >> [email protected]:8115/user/sharding/TokenOAuthState#-1679280759], 35 -> >> Actor[akka.tcp:// >> [email protected]:8115/user/sharding/TokenOAuthState#-1679280759], -66 >> -> Actor[akka.tcp:// >> [email protected]:8115/user/sharding/TokenOAuthState#-1679280759], -23 >> -> Actor[akka.tcp:// >> [email protected]:8115/user/sharding/TokenOAuthState#1939420601], -11 -> >> Actor[akka.tcp:// >> [email protected]:8115/user/sharding/TokenOAuthState#1939420601] >> ),Map(Actor[akka.tcp:// >> [email protected]:8115/user/sharding/TokenOAuthState#-1679280759] -> >> Vector(35, -58, -66, 33), >> Actor[akka://User/user/sharding/TokenOAuthState#-1778736418] -> Vector(-63, >> 23, 50), Actor[akka.tcp:// >> [email protected]:8115/user/sharding/TokenOAuthState#1939420601] -> >> Vector(-23, 40, -11)),Set()) >> >> On Wednesday, November 18, 2015 at 12:23:28 PM UTC-8, Patrik Nordwall >> wrote: >>> >>> Leveldb can't be used for cluster sharding, since that is a local >>> journal. The documentation of persistence has links to distributed >>> journals. >> >> -- >> >>>>>>>>>> Read the docs: http://akka.io/docs/ >> >>>>>>>>>> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user >> --- >> You received this message because you are subscribed to the Google Groups >> "Akka User List" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected] <javascript:>. >> To post to this group, send email to [email protected] >> <javascript:>. >> Visit this group at http://groups.google.com/group/akka-user. >> For more options, visit https://groups.google.com/d/optout. >> > > > > -- > > Patrik Nordwall > Typesafe <http://typesafe.com/> - Reactive apps on the JVM > Twitter: @patriknw > > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
