Given that fact, I went ahead and implemented a ShardAllocationStrategy that 
takes into account different entry types for figuring out what nodes to 
create new entries in. First, I just want to describe the use-case that led 
to me implementing this.

I'm working on a pipeline that will ingest real-time social media data and 
do processing on it. This pipeline will be getting this data from different 
streams (ie. one for Twitter, one for Facebook, etc). As such, I wanted to 
take advantage of the high availability and scalability features that 
Akka's clustering feature provides. I considered using singletons for this 
except that this would create a single point of failure in that if my 
eldest node goes down I lose all of my data streams until they're able to 
start up on a different node. Using singletons would have also created a 
resource bottleneck on the eldest node. As such, I decided to make use of 
sharding in order to distribute all of my different streams throughout my 
cluster. While sharding provides the distribution that singletons lack, it 
only works when sharding the same entry type (as you Patrik pointed out and 
I came to find out). This presented a problem since in my case an entry 
corresponded to a stream and no 2 streams were really the same. I did try 
doing something like this:

ClusterSharding.get(system).start("Feed",  StreamActor.props(), new 
MessageExtractor());

where instead of initializing the stream through its constructor, I would 
initialize it by passing in the necessary arguments through a series of 
message using ".tell()". This appeared to be promising at first until I 
reached a point where regardless of how generic I made my interfaces there 
was always something that a 3rd party library required that couldn't be 
abstracted from the Objects that Akka receives which left me with a bunch 
of "unchecked cast" warnings (which for me are a "no no" for me). This led 
to my only other option which was to implement my own 
ShardingAllocationStrategy that could work across different entry types. 
This looks like this:

public class FeedShardAllocationStrategy implements 
ShardCoordinator.ShardAllocationStrategy {
    private HashMap<String, Long> shardCount;

    public FeedShardAllocationStrategy(HashMap<String, Long> shardCount) {
        Validate.notNull(shardCount, "shardCount must not be null.");

        this.shardCount = shardCount;
    }

    @Override
    public ActorRef allocateShard(ActorRef actorRef, String s, 
Map<ActorRef, IndexedSeq<String>> actorRefIndexedSeqMap) {
        synchronized (this) {
            Iterator<ActorRef> shardRegions = 
actorRefIndexedSeqMap.keysIterator();

            ActorRef leastPopulatedShardRegion = new 
ShardCoordinator.LeastShardAllocationStrategy(1, 3).allocateShard(actorRef, 
s, actorRefIndexedSeqMap);
            long leastPopulatedShardCount = Long.MAX_VALUE;

            while (shardRegions.hasNext()) {
                ActorRef shardRegion = shardRegions.next();
                String address = shardRegion.path().address().toString();

                if (shardCount.get(address) == null) {
                    leastPopulatedShardCount = 0;
                    leastPopulatedShardRegion = shardRegion;
                } else {
                    long shardCount = this.shardCount.get(address);

                    if (shardCount < leastPopulatedShardCount) {
                        leastPopulatedShardCount = shardCount;
                        leastPopulatedShardRegion = shardRegion;
                    }
                }
            }

            
shardCount.put(leastPopulatedShardRegion.path().address().toString(), 
leastPopulatedShardCount + 1);
            return leastPopulatedShardRegion;
        }
    }

    @Override
    public Set<String> rebalance(scala.collection.immutable.Map<ActorRef, 
scala.collection.immutable.IndexedSeq<String>> actorRefIndexedSeqMap, 
Set<String> stringSet) {
        return new HashSet<>();
    }
}

As you can see in the allocateShard method, I use a HashMap to keep track 
of the number of shards that have been created using their address. By 
default I assign my leastPopulatedShardRegion to be whatever ShardRegion 
the LeastShardAllocationStrategy provides. For the "rebalance" method, 
since I don't want to have my streams stop in a re-balance, I return an 
empty Set in order to prevent this. This actually works quite well with my 
requirements as long as each stream shard implements the same instance of 
the ShardAllocationStrategy. For example doing:

ShardCoordinator.ShardAllocationStrategy shardAllocationStrategy = new 
FeedShardAllocationStrategy(new HashMap<>());

ClusterSharding.get(system).start("TwitterStream", StreamActor.props(new 
TwitterStreamManager()), new MessageExtractor(), shardAllocationStrategy);
ClusterSharding.get(system).start("FacebookStream", StreamActor.props(new 
FacebookStreamManager()), new MessageExtractor(), shardAllocationStrategy);
ClusterSharding.get(system).start("GooglePlusStream", StreamActor.props(new 
GooglePlusStreamManager()), new MessageExtractor(), 
shardAllocationStrategy);

allows for my streams to be evenly distributed across my cluster while at 
the same time allowing me to implement different behavior for each one 
without compromising the "cleanliness" of the rest of my application (in 
addition to having the streams restart if the node they are on goes down).

Now I realize that sharding isn't really meant to work across different 
entry types and that sharding is meant to be used to scale out the 
functionality of a particular entry type by creating multiple instances of 
them throughout a cluster, unlike the way that I'm using it where I only 
have a single instance of the same entry type running in the entirety of my 
cluster. I guess I just had a very specific use-case that I wanted to 
implement in Akka without having to involve additional third party tools. 
What do you guys think? Thoughts, comments?

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to