Given that fact, I went ahead and implemented a ShardAllocationStrategy that
takes into account different entry types for figuring out what nodes to
create new entries in. First, I just want to describe the use-case that led
to me implementing this.
I'm working on a pipeline that will ingest real-time social media data and
do processing on it. This pipeline will be getting this data from different
streams (ie. one for Twitter, one for Facebook, etc). As such, I wanted to
take advantage of the high availability and scalability features that
Akka's clustering feature provides. I considered using singletons for this
except that this would create a single point of failure in that if my
eldest node goes down I lose all of my data streams until they're able to
start up on a different node. Using singletons would have also created a
resource bottleneck on the eldest node. As such, I decided to make use of
sharding in order to distribute all of my different streams throughout my
cluster. While sharding provides the distribution that singletons lack, it
only works when sharding the same entry type (as you Patrik pointed out and
I came to find out). This presented a problem since in my case an entry
corresponded to a stream and no 2 streams were really the same. I did try
doing something like this:
ClusterSharding.get(system).start("Feed", StreamActor.props(), new
MessageExtractor());
where instead of initializing the stream through its constructor, I would
initialize it by passing in the necessary arguments through a series of
message using ".tell()". This appeared to be promising at first until I
reached a point where regardless of how generic I made my interfaces there
was always something that a 3rd party library required that couldn't be
abstracted from the Objects that Akka receives which left me with a bunch
of "unchecked cast" warnings (which for me are a "no no" for me). This led
to my only other option which was to implement my own
ShardingAllocationStrategy that could work across different entry types.
This looks like this:
public class FeedShardAllocationStrategy implements
ShardCoordinator.ShardAllocationStrategy {
private HashMap<String, Long> shardCount;
public FeedShardAllocationStrategy(HashMap<String, Long> shardCount) {
Validate.notNull(shardCount, "shardCount must not be null.");
this.shardCount = shardCount;
}
@Override
public ActorRef allocateShard(ActorRef actorRef, String s,
Map<ActorRef, IndexedSeq<String>> actorRefIndexedSeqMap) {
synchronized (this) {
Iterator<ActorRef> shardRegions =
actorRefIndexedSeqMap.keysIterator();
ActorRef leastPopulatedShardRegion = new
ShardCoordinator.LeastShardAllocationStrategy(1, 3).allocateShard(actorRef,
s, actorRefIndexedSeqMap);
long leastPopulatedShardCount = Long.MAX_VALUE;
while (shardRegions.hasNext()) {
ActorRef shardRegion = shardRegions.next();
String address = shardRegion.path().address().toString();
if (shardCount.get(address) == null) {
leastPopulatedShardCount = 0;
leastPopulatedShardRegion = shardRegion;
} else {
long shardCount = this.shardCount.get(address);
if (shardCount < leastPopulatedShardCount) {
leastPopulatedShardCount = shardCount;
leastPopulatedShardRegion = shardRegion;
}
}
}
shardCount.put(leastPopulatedShardRegion.path().address().toString(),
leastPopulatedShardCount + 1);
return leastPopulatedShardRegion;
}
}
@Override
public Set<String> rebalance(scala.collection.immutable.Map<ActorRef,
scala.collection.immutable.IndexedSeq<String>> actorRefIndexedSeqMap,
Set<String> stringSet) {
return new HashSet<>();
}
}
As you can see in the allocateShard method, I use a HashMap to keep track
of the number of shards that have been created using their address. By
default I assign my leastPopulatedShardRegion to be whatever ShardRegion
the LeastShardAllocationStrategy provides. For the "rebalance" method,
since I don't want to have my streams stop in a re-balance, I return an
empty Set in order to prevent this. This actually works quite well with my
requirements as long as each stream shard implements the same instance of
the ShardAllocationStrategy. For example doing:
ShardCoordinator.ShardAllocationStrategy shardAllocationStrategy = new
FeedShardAllocationStrategy(new HashMap<>());
ClusterSharding.get(system).start("TwitterStream", StreamActor.props(new
TwitterStreamManager()), new MessageExtractor(), shardAllocationStrategy);
ClusterSharding.get(system).start("FacebookStream", StreamActor.props(new
FacebookStreamManager()), new MessageExtractor(), shardAllocationStrategy);
ClusterSharding.get(system).start("GooglePlusStream", StreamActor.props(new
GooglePlusStreamManager()), new MessageExtractor(),
shardAllocationStrategy);
allows for my streams to be evenly distributed across my cluster while at
the same time allowing me to implement different behavior for each one
without compromising the "cleanliness" of the rest of my application (in
addition to having the streams restart if the node they are on goes down).
Now I realize that sharding isn't really meant to work across different
entry types and that sharding is meant to be used to scale out the
functionality of a particular entry type by creating multiple instances of
them throughout a cluster, unlike the way that I'm using it where I only
have a single instance of the same entry type running in the entirety of my
cluster. I guess I just had a very specific use-case that I wanted to
implement in Akka without having to involve additional third party tools.
What do you guys think? Thoughts, comments?
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.