Hi Luis,

2 jun 2014 kl. 09:56 skrev Luis Medina <[email protected]>:

> Given that fact, I went ahead and implemented a ShardAllocationStrategy that 
> takes into account different entry types for figuring out what nodes to 
> create new entries in. First, I just want to describe the use-case that led 
> to me implementing this.
> 
> I'm working on a pipeline that will ingest real-time social media data and do 
> processing on it. This pipeline will be getting this data from different 
> streams (ie. one for Twitter, one for Facebook, etc). As such, I wanted to 
> take advantage of the high availability and scalability features that Akka's 
> clustering feature provides. I considered using singletons for this except 
> that this would create a single point of failure in that if my eldest node 
> goes down I lose all of my data streams until they're able to start up on a 
> different node. Using singletons would have also created a resource 
> bottleneck on the eldest node. As such, I decided to make use of sharding in 
> order to distribute all of my different streams throughout my cluster. While 
> sharding provides the distribution that singletons lack, it only works when 
> sharding the same entry type (as you Patrik pointed out and I came to find 
> out). This presented a problem since in my case an entry corresponded to a 
> stream and no 2 streams were really the same. I did try doing something like 
> this:
> 
> ClusterSharding.get(system).start("Feed",  StreamActor.props(), new 
> MessageExtractor());
> 
> where instead of initializing the stream through its constructor, I would 
> initialize it by passing in the necessary arguments through a series of 
> message using ".tell()". This appeared to be promising at first until I 
> reached a point where regardless of how generic I made my interfaces there 
> was always something that a 3rd party library required that couldn't be 
> abstracted from the Objects that Akka receives which left me with a bunch of 
> "unchecked cast" warnings (which for me are a "no no" for me). This led to my 
> only other option which was to implement my own ShardingAllocationStrategy 
> that could work across different entry types. This looks like this:
> 
> public class FeedShardAllocationStrategy implements 
> ShardCoordinator.ShardAllocationStrategy {
>     private HashMap<String, Long> shardCount;
> 
>     public FeedShardAllocationStrategy(HashMap<String, Long> shardCount) {
>         Validate.notNull(shardCount, "shardCount must not be null.");
> 
>         this.shardCount = shardCount;
>     }
> 
>     @Override
>     public ActorRef allocateShard(ActorRef actorRef, String s, Map<ActorRef, 
> IndexedSeq<String>> actorRefIndexedSeqMap) {
>         synchronized (this) {

no need to synchronize things: this is only used within a Cluster Singleton

>             Iterator<ActorRef> shardRegions = 
> actorRefIndexedSeqMap.keysIterator();
> 
>             ActorRef leastPopulatedShardRegion = new 
> ShardCoordinator.LeastShardAllocationStrategy(1, 3).allocateShard(actorRef, 
> s, actorRefIndexedSeqMap);
>             long leastPopulatedShardCount = Long.MAX_VALUE;
> 
>             while (shardRegions.hasNext()) {
>                 ActorRef shardRegion = shardRegions.next();
>                 String address = shardRegion.path().address().toString();
> 
>                 if (shardCount.get(address) == null) {

This can also be null after the ShardCoordinator singleton has been moved to a 
different cluster node; a stateful ShardAllocationStrategy is not likely to 
work as you expect in all cases.

>                     leastPopulatedShardCount = 0;
>                     leastPopulatedShardRegion = shardRegion;
>                 } else {
>                     long shardCount = this.shardCount.get(address);
> 
>                     if (shardCount < leastPopulatedShardCount) {
>                         leastPopulatedShardCount = shardCount;
>                         leastPopulatedShardRegion = shardRegion;
>                     }
>                 }
>             }
> 
>             
> shardCount.put(leastPopulatedShardRegion.path().address().toString(), 
> leastPopulatedShardCount + 1);
>             return leastPopulatedShardRegion;
>         }
>     }
> 
>     @Override
>     public Set<String> rebalance(scala.collection.immutable.Map<ActorRef, 
> scala.collection.immutable.IndexedSeq<String>> actorRefIndexedSeqMap, 
> Set<String> stringSet) {
>         return new HashSet<>();
>     }
> }
> 
> As you can see in the allocateShard method, I use a HashMap to keep track of 
> the number of shards that have been created using their address. By default I 
> assign my leastPopulatedShardRegion to be whatever ShardRegion the 
> LeastShardAllocationStrategy provides. For the "rebalance" method, since I 
> don't want to have my streams stop in a re-balance, I return an empty Set in 
> order to prevent this. This actually works quite well with my requirements as 
> long as each stream shard implements the same instance of the 
> ShardAllocationStrategy. For example doing:
> 
> ShardCoordinator.ShardAllocationStrategy shardAllocationStrategy = new 
> FeedShardAllocationStrategy(new HashMap<>());
> 
> ClusterSharding.get(system).start("TwitterStream", StreamActor.props(new 
> TwitterStreamManager()), new MessageExtractor(), shardAllocationStrategy);
> ClusterSharding.get(system).start("FacebookStream", StreamActor.props(new 
> FacebookStreamManager()), new MessageExtractor(), shardAllocationStrategy);
> ClusterSharding.get(system).start("GooglePlusStream", StreamActor.props(new 
> GooglePlusStreamManager()), new MessageExtractor(), shardAllocationStrategy);
> 
> allows for my streams to be evenly distributed across my cluster while at the 
> same time allowing me to implement different behavior for each one without 
> compromising the "cleanliness" of the rest of my application (in addition to 
> having the streams restart if the node they are on goes down).
> 
> Now I realize that sharding isn't really meant to work across different entry 
> types and that sharding is meant to be used to scale out the functionality of 
> a particular entry type by creating multiple instances of them throughout a 
> cluster, unlike the way that I'm using it where I only have a single instance 
> of the same entry type running in the entirety of my cluster. I guess I just 
> had a very specific use-case that I wanted to implement in Akka without 
> having to involve additional third party tools. What do you guys think? 
> Thoughts, comments?

Why don’t you normally shard actors of the same kind who will just supervise 
these different stream handlers? You could make a region of StreamSupervisors 
and then send three messages:

ClusterSharding.get(system).start("Streams", StreamSupervisor.props(), new 
MessageExtractor());
final ActorRef streams = ClusterSharding.get(system).shardRegion("Streams");
streams.tell(new CreateStream("TwitterStream", StreamActor.props(new 
TwitterStreamManager())), ActorRef.noSender());
// same for FB and G+

You could still use a shard allocator that never rebalances.

Regards,


Dr. Roland Kuhn
Akka Tech Lead
Typesafe – Reactive apps on the JVM.
twitter: @rolandkuhn


-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to