Hi everyone,
I have a scenario where I use Cluster Sharding as follows.
Each Node/Machine:
1. Cluster Sharding enabled ActorSystem1 running on port XXXX behind
Docker Container
2. There is only one ShardType in this ActorSystem1 and 10 Shards
3. The Entity actors are persistent using Cassandra
The message flow looks like this
- Message1 to ShardRegion
- Message1 reaches PersistentEntityActor
- PersistentEntityActor asks for something from a RemoteActor( in a
different actor system with different port in the same machine).
PersistentEntityActor knows about the RemoteActorReferencce through
constructor - please see code below
- RemoteActor forwards the message to a child worker.
- Child worker need to respond back to the PersistentEntityActor who
asked for something.
The flow works fine when the Cluster Sharding and RemoteActor are running
in their respective actorsystems with different ports.
But when I create the RemoteActor in the same actor system as
ClusterSharding as a normal actor. The child worker is not sending response
back to the PersistentEntityActor.
class PersistentEntityActor(remoteActorRef: ActorRef) extends Actor {
override def receive: Receive = {
case Cmd(data) => {
implicit val timeout = Timeout.apply(6 seconds)
val response = remoteRouterRef ? "Hey there"
response map (resp =>
resp match {
case ResultA =>
println(s"The response from remote actor for ask is : ResultA")
case ResultB =>
println(s"The response from remote actor for ask is : ResultB")
}
)
}
}
}
object PersistentEntityActor {
case class Cmd(data: Int)
// I have skipped events to keep the code simple
val extractShardId: ExtractShardId = {
case Cmd(data) => (data % 2).toString
}
val extractEntityId: ExtractEntityId = {
case cmd: Cmd => (cmd.data.toString, cmd)
}
case class ResultA()
case class ResultB()
}
class RemoteActor extends Actor{
override def receive: Receive = {
case anyMsg =>
val child = context.actorOf(Props[Worker])
child.forward(anyMsg)
}
}
class Worker extends Actor{
override def receive: Receive = {
case msg =>
sender() ! ResultA
}
}
object RunWithSeparateActorSystem extends App {
// create remote actor
val remoteConfig =
ConfigFactory.parseString("akka.remote.netty.tcp.port=" + 5150).
withFallback(ConfigFactory.load("remoting_conf"))
val remoteEnabledActorSystem = ActorSystem("RemoteActorSystem",
remoteConfig)
val remoteActor = remoteEnabledActorSystem.actorOf(Props[RemoteActor],
"RemoteRouter")
// cluster sharding
val clusterConfig =
ConfigFactory.parseString("akka.remote.netty.tcp.port=" + 2551).
withFallback(ConfigFactory.load("sharding_cassandra_conf"))
val system = ActorSystem("ClusterSystem", clusterConfig)
val shardRegionRef =
ClusterSharding(system).start(typeName = "ShardActorToCallRemote",
entityProps = Props(new ShardActorToCallRemote(remoteActor)),
settings = ClusterShardingSettings(system),
extractShardId = ShardActorToCallRemote.extractShardId,
extractEntityId = ShardActorToCallRemote.extractEntityId)
// give some time for the shard setup to come up
Thread.sleep(4000)
shardRegionRef ! Cmd(10090)
}
object RunWithSameActorSystem extends App {
// cluster sharding
val clusterConfig =
ConfigFactory.parseString("akka.remote.netty.tcp.port=" + 2551).
withFallback(ConfigFactory.load("sharding_cassandra_conf"))
val system = ActorSystem("ClusterSystem", clusterConfig)
// create remote actor
val remoteActor = system.actorOf(Props[RemoteActor], "RemoteRouter")
val shardRegionRef =
ClusterSharding(system).start(typeName = "ShardActorToCallRemote",
entityProps = Props(new ShardActorToCallRemote(remoteActor)),
settings = ClusterShardingSettings(system),
extractShardId = ShardActorToCallRemote.extractShardId,
extractEntityId = ShardActorToCallRemote.extractEntityId)
// give some time for the shard setup to come up
Thread.sleep(4000)
shardRegionRef ! Cmd(10090)
}
sharding_cassandra_conf.conf
akka {
loglevel = INFO
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
seed-nodes = ["akka.tcp://[email protected]:2551"]
auto-down-unreachable-after = 10s
metrics.enabled = off
sharding {
remember-entities = on
journal-plugin-id = "cassandra-journal"
snapshot-plugin-id = "cassandra-snapshot-store"
rebalance-interval = 10 s
}
}
persistence {
journal.plugin = cassandra-journal
snapshot-store.plugin = cassandra-snapshot-store
}
}
remoting_conf.conf
akka {
loglevel = "INFO"
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
}
log-sent-messages = on
log-received-messages = on
}
}
Thanks.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.