Hi,
I am trying to build a backend system with akka cluster and cluster
sharding. Everything goes well except the performance issue when using akka
cluster sharding.
It seems creating the actor with shard region actor is extremely slow(The
shard actor is created when call region actor tell.). Have any one met this
issue before or it is some thing the sharding works.
My test actor in JAVA ...
public class AuthService extends BaseService {
public AuthService(ActorRef serviceActor) {
super(serviceActor);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(AuthRequest.class, authRequest -> {
getContext().system().log().info("Get auth request from
client {}", authRequest.getToken());
this.getServiceActor().tell(authRequest, getSender());
}).build();
}
public static void config(ActorSystem system) {
ClusterSharding.get(system)
.start(AuthServiceActor.SHARD,
Props.create(AuthServiceActor.class,
AuthServiceActor::new),
ClusterShardingSettings.create(system),
AuthServiceActor.shardExtractor()
);
ActorRef actorRef =
ClusterSharding.get(system).shardRegion(AuthServiceActor.SHARD);
ActorRef authService = system.actorOf(Props.create(AuthService.class,
actorRef), "authService");
ClusterClientReceptionist.get(system).registerService(authService);
}
}
The sharding actor:
public class AuthServiceActor extends AbstractActor {
public static final String SHARD = "authService";
@Override
public void preStart() throws Exception {
super.preStart();
getContext().system().log().info("Auth service start, {}",
getSelf().path());
}
@Override
public void postStop() throws Exception {
super.postStop();
getContext().system().log().info("Auth service stop {}",
getSelf().path());
}
public Receive createReceive() {
return receiveBuilder()
.match(AuthRequest.class, authRequest -> {
getContext().system().log().info("Receive auth message from
user {}", authRequest.getToken());
// No logic here, return back token as name and id.
String userId = authRequest.getToken();
ActorRef sender = getSender();
sender.tell(new AuthResponse(userId, userId), getSelf());
getContext().system().log().info("Send back auth response
{}", userId);
})
.build();
}
public static ShardRegion.MessageExtractor shardExtractor() {
return new AuthServiceActor.AuthServiceShardMessageExtractor();
}
private static class AuthServiceShardMessageExtractor extends
ShardRegion.HashCodeMessageExtractor {
private static final int shardNumber = 64;
AuthServiceShardMessageExtractor() {
super(shardNumber);
}
@Override
public String entityId(Object o) {
if (o instanceof AuthRequest) {
return ((AuthRequest) o).getToken();
}
return null;
}
}
}
User Actor
public class UserActor extends AbstractActor {
public final static String SHARD = "User";
private ActorRef sessionActor;
@Override
public void preStart() throws Exception {
super.preStart();
getContext().setReceiveTimeout(scala.concurrent.duration.Duration.create(300,
TimeUnit.SECONDS));
getContext().system().log().info("User actor start {}", self().path());
}
@Override
public void postStop() throws Exception {
super.postStop();
getContext().system().log().info("User actor stop", self().path());
}
@Override
public Receive createReceive() {
return receiveBuilder().match(RegisterSession.class, registerSession ->
{
ActorRef sender = getSender();
getContext().system().log().info("Send register success message
to current session actor {}", sessionActor.path());
getSender().tell(true, getSelf());
}).
public class UserService extends BaseService {
public UserService(ActorRef serviceActor) {
super(serviceActor);
}
public static void config(ActorSystem system) {
ActorRef userActor = ClusterSharding.get(system)
.start(UserActor.SHARD,
Props.create(UserActor.class, UserActor::new),
ClusterShardingSettings.create(system),
UserActor.shardExtractor()
);
ActorRef actorRef =
ClusterSharding.get(system).shardRegion(UserActor.SHARD);
ActorRef userService = system.actorOf(Props.create(UserService.class,
actorRef), "userService");
ClusterClientReceptionist.get(system).registerService(userService);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(RegisterSession.class, registerSession -> {
getContext().system().log().info("Get register message {}
from actor {}", registerSession.getUserId(), registerSession.getActorRef());
this.getServiceActor().tell(registerSession, getSender());
xxxxx
}
}
The log in the cluser:
[INFO] [07/14/2017 12:13:27.306]
[NettyServerClusterSharding-akka.actor.default-dispatcher-15]
[akka.actor.ActorSystemImpl(NettyServerClusterSharding)] Get auth request
from client testtesttest
[DEBUG] [07/14/2017 12:13:27.307]
[NettyServerClusterSharding-akka.actor.default-dispatcher-15]
[akka.tcp://[email protected]:2556/system/sharding/authService/10]
Starting entity [testtesttest] in shard [10]
[INFO] [07/14/2017 12:13:27.307]
[NettyServerClusterSharding-akka.actor.default-dispatcher-15]
[akka.actor.ActorSystemImpl(NettyServerClusterSharding)] Auth service
start,
akka://NettyServerClusterSharding/system/sharding/authService/10/testtesttest
[INFO] [07/14/2017 12:13:27.307]
[NettyServerClusterSharding-akka.actor.default-dispatcher-15]
[akka.actor.ActorSystemImpl(NettyServerClusterSharding)] Receive auth
message from user testtesttest
[INFO] [07/14/2017 12:13:27.307]
[NettyServerClusterSharding-akka.actor.default-dispatcher-15]
[akka.actor.ActorSystemImpl(NettyServerClusterSharding)] Send back auth
response testtesttest
[INFO] [07/14/2017 12:13:27.309]
[NettyServerClusterSharding-akka.actor.default-dispatcher-20]
[akka.actor.ActorSystemImpl(NettyServerClusterSharding)] Get register
message testtesttest from actor
Actor[akka.tcp://[email protected]:2551/user/SessionActor95686a26#2016505524]
[DEBUG] [07/14/2017 12:13:27.309]
[NettyServerClusterSharding-akka.actor.default-dispatcher-5]
[akka.tcp://[email protected]:2556/system/sharding/User]
Request shard [330] home
[DEBUG] [07/14/2017 12:13:27.511]
[NettyServerClusterSharding-akka.actor.default-dispatcher-5]
[akka.tcp://[email protected]:2556/system/sharding/UserCoordinator/singleton/coordinator]
Shard [330] allocated at
[Actor[akka://NettyServerClusterSharding/system/sharding/User#2032659771]]
[DEBUG] [07/14/2017 12:13:27.511]
[NettyServerClusterSharding-akka.actor.default-dispatcher-5]
[akka.tcp://[email protected]:2556/system/sharding/User]
Host Shard [330]
[DEBUG] [07/14/2017 12:13:27.511]
[NettyServerClusterSharding-akka.actor.default-dispatcher-5]
[akka.tcp://[email protected]:2556/system/sharding/User]
Starting shard [330] in region
[DEBUG] [07/14/2017 12:13:27.512]
[NettyServerClusterSharding-akka.actor.default-dispatcher-5]
[akka.tcp://[email protected]:2556/system/sharding/User]
Shard [330] located at
[Actor[akka://NettyServerClusterSharding/system/sharding/User#2032659771]]
[DEBUG] [07/14/2017 12:13:27.512]
[NettyServerClusterSharding-akka.actor.default-dispatcher-5]
[akka.tcp://[email protected]:2556/system/sharding/User]
Shard was initialized 330
[DEBUG] [07/14/2017 12:13:27.512]
[NettyServerClusterSharding-akka.actor.default-dispatcher-5]
[akka.tcp://[email protected]:2556/system/sharding/User]
Deliver [1] buffered messages for shard [330]
[DEBUG] [07/14/2017 12:13:27.512]
[NettyServerClusterSharding-akka.actor.default-dispatcher-5]
[akka.tcp://[email protected]:2556/system/sharding/User/330]
Starting entity [testtesttest] in shard [330]
[INFO] [07/14/2017 12:13:27.512]
[NettyServerClusterSharding-akka.actor.default-dispatcher-5]
[akka.actor.ActorSystemImpl(NettyServerClusterSharding)] User actor start
akka://NettyServerClusterSharding/system/sharding/User/330/testtesttest
As you can see it cost 200 ms to allocate the actor in the coordinator.
If i increase the client number, the time some times increased to more than
1s.
Totally not acceptable in high volume application.
Anyone knows why the ActorRef actorRef = ClusterSharding.get
(system).shardRegion(UserActor.SHARD)
actorRef tell is so slow in cluster sharding?
The config file
include "application"
akka {
loglevel = "DEBUG"
remote {
netty.tcp {
hostname = "127.0.0.1"
port = 2556
}
}
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2556"]
roles = [clusterService]
}
cluster.sharding.state-store-mode = persistence
persistence {
journal.plugin = "akka-contrib-mongodb-persistence-journal"
snapshot-store.plugin = "akka-contrib-mongodb-persistence-snapshot"
}
contrib.persistence.mongodb.mongo {
mongouri = "mongodb://127.0.0.1:27017/netty"
journal-collection = "journal"
snaps-collection = "snapshots"
}
}
akka.cluster.role {
clusterService.min-nr-of-members = 1
}
akka.cluster.sharding.role = "clusterService"
Thanks
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.