Hi,
We are using akka sharding extension for distributing message broker
consumer actors over our cluster. Our in house consumer library, uses death
watch on the consumers to manage it's state (when one of the actors
connecting to the broker is moved to the other node, the supervisor no
longer has to worry about handling stuff like reconnections etc.)
The problem that we're seing, is that sometimes, the Terminated message is
not received from the sharded actor. That compromises our library internal
state and causes problems.
Of course the library itself could be easily fixed, but I just wonder if
the deathwatch feature in general shouldn't be used with sharded actors as
it may cause problems?
Below I have the simplified code snippet that reproduces the problem:
Akka version: 2.4.0 (akka-actor, akka-cluster, akka-cluster-sharding)
akka-persistence-cassandra: 0.4
object Node {
def main(args: Array[String]):Unit = {
val system = ActorSystem("ClusterSystem", ConfigFactory.parseString(
AkkaConfig.config(args(0).toInt)))
consumerStart(system)
scheduleMessages(system)
}
def scheduleMessages(system: ActorSystem): Unit ={
implicit val dispatcher = system.dispatcher
(0 to 1) foreach { i =>
system.scheduler.schedule(0.millis, 5 seconds, ClusterSharding.get(
system).shardRegion(Consumer.shardName), SomeMessage(i))
}
}
def consumerStart(system: ActorSystem): Unit = {
val watcher: ActorRef = system.actorOf(Props(new Watcher()))
ClusterSharding(system).start(
typeName = Consumer.shardName,
Consumer.props(watcher),
settings = ClusterShardingSettings(system),
extractEntityId = Consumer.entityExtractor,
extractShardId = Consumer.shardExtractor,
allocationStrategy = new LeastShardAllocationStrategy(2, 1),
handOffStopMessage = Stop
)
}
}
object Consumer {
def props(watcher: ActorRef): Props = Props(new Consumer(watcher))
case class SomeMessage(id: Int)
val shardName = "testShard"
val entityExtractor: ExtractEntityId = {
case msg@SomeMessage(id) => (id.toString, msg)
}
val shardExtractor: ExtractShardId = {
case msg@SomeMessage(id) => id.toString
}
}
class Consumer(watcher: ActorRef) extends Actor {
override def preStart: Unit = {
println(s"Starting consumer for ${self.path.name}")
watcher ! WatchMe(self, self.path.name)
}
override def postStop: Unit = {
println(s"Stopping consumer for ${self.path.name}")
}
override def receive: Receive = {
case SomeMessage(id) =>
println(s"Received message for ${id}")
case Stop =>
println(s"Received stop message for ${self.path.name}")
self ! PoisonPill
}
}
object Watcher {
case class WatchMe(actorRef: ActorRef, id: String)
}
class Watcher extends Actor{
override def receive: Receive = {
case WatchMe(ref, id) => {
println(s"Watching received! Ref: ${ref} Id: ${id}")
context.watch(ref)
}
case Terminated(ref) => println(s"Terminated received! ${ref}")
}
}
object AkkaConfig {
val config: (Int) => String = port =>
s"""akka {
| actor {
| provider = "akka.cluster.ClusterActorRefProvider"
| }
| remote {
| log-remote-lifecycle-events = off
| netty.tcp {
| hostname = "127.0.0.1"
| port = ${port}
| }
| }
|
| persistence {
| journal.plugin = "cassandra-journal"
| snapshot-store.plugin = "cassandra-snapshot-store"
| }
|
| cluster {
| min-nr-of-members=1
| seed-nodes = [
| "akka.tcp://[email protected]:2551",
| "akka.tcp://[email protected]:2552"
| ]
| sharding {
| remember-entities = on
| journal-plugin-id = "cassandra-journal"
| snapshot-plugin-id = "cassandra-snapshot-store"
| rebalance-interval = 10 s
| }
| auto-down-unreachable-after = 10s
| }
|}
|
|cassandra-journal {
| contact-points = [cassandra01.weave.local]
|}
|cassandra-snapshot-store {
| contact-points = [cassandra01.weave.local]
|}
""".stripMargin
}
After running this example twice simultaneously, once with 2551 main
argument and second time with 2552 I'm seing following logs:
*Node 2551*
Starting consumer for 1
Watching received! Ref:
Actor[akka://ClusterSystem/system/sharding/testShard/1/1#-1013061113]
Id: 1
Received message for 1
Received message for 1
Received message for 1
Received message for 1
*Node 2552*
Starting consumer for 1
Starting consumer for 0
Watching received! Ref:
Actor[akka://ClusterSystem/system/sharding/testShard/0/0#1034416006]
Id: 0
Watching received! Ref:
Actor[akka://ClusterSystem/system/sharding/testShard/1/1#26936794]
Id: 1
Received message for 0
Received message for 0
Received message for 0
Received message for 0
Received message for 0
As you can see, no termination messages are received.
If I however start the first node, give it some time to start, and only
after that start the second one I'm seing:
*Node1*
Starting consumer for 0
Starting consumer for 1
Watching received! Ref:
Actor[akka://ClusterSystem/system/sharding/testShard/1/1#-2140659337]
Id: 1
Received message for 1
Received message for 0
Received message for 1
Received message for 0
Received message for 0
Watching received! Ref:
Actor[akka://ClusterSystem/system/sharding/testShard/0/0#92401496]
Id: 0
Received message for 1
Received message for 0
Received message for 1
Received message for 0
Received message for 1
[INFO] [11/06/2015 10:05:45.051] [ClusterSystem-akka.actor.default-
dispatcher-17] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node
[akka.tcp://[email protected]:2551] - Node
[akka.tcp://[email protected]:2552] is JOINING, roles []
[INFO] [11/06/2015 10:05:45.424] [ClusterSystem-akka.actor.default-
dispatcher-17] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node
[akka.tcp://[email protected]:2551] - Leader is moving node
[akka.tcp://[email protected]:2552] to [Up]
Received message for 0
Received message for 1
Received message for 0
Received message for 1
Received message for 0
Received message for 1
Received message for 0
Received message for 1
Received message for 0
Received message for 1
Received message for 1
Received message for 0
Received stop message for 0
Stopping consumer for 0
Terminated received! Actor[akka:
//ClusterSystem/system/sharding/testShard/0/0#92401496]
Received message for 1
Received message for 1
Received message for 1
Received message for 1
*Node2*
Starting consumer for 0
Watching received! Ref:
Actor[akka://ClusterSystem/system/sharding/testShard/0/0#-930183150]
Id: 0
Received message for 0
Received message for 0
Received message for 0
Received message for 0
In this scenario the deathwatch works fine.
I don't know if starting nodes together is the only way to reproduce the
problem, I've seen it occurring few times in our OAT environment, and the
example above is the only way I could reproduce it locally.
I will appreciate any help on that, maybe I'm missing something obvious, or
simply death watch is not guaranteed to work fine in the Sharding
extension, but I haven't seen it anywhere in the documentation.
Thanks a lot,
Marcin
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.