Hi! I've had some problems handling temporary network
partitions/communication problems which causes nodes to get quarantined.
Earlier I've found out that I can hit the system message buffer size;
https://groups.google.com/d/topic/akka-user/NGLi9GTZ42o/discussion which
causes quarantine. In the earlier case it was due to mass-death/creation of
remotely watched actors.
Now I have struck into another use case where I get quarantined nodes, this
time not because of excessive load but instead because of network partition
and usage of publish-subscribe. My use case isn't maybe the most normal but
let me try to explain it anyway;
I have one node running my own java-akka process (ServerA) and one node
running play framework (ServerB). In ServerA I've called the actor system
"myactorsystem"(ActorSystemA). At first I tried to make the internal actor
system(ActorSystemB) in play framework to connect to the same cluster as
ActorSystemA but that didn't work because apparently Play Framework has the
actor system name of "application" that can't be changed. I ended up
creating a separate actor system(ActorSystemC) in my play application with
the name "myactorsystem" and connect that to the cluster and that seemed to
work fine. I use WebSockets in the play framework which gets represented by
an actor in ActorSystemB. This websocket actor gets sent to an actor in
ActorSystemA, through a cluster aware router in ActorSystemC, which
subscribes it to a topic using Publish-Subscribe.
It all runs fine until I introduce a network partition between the nodes
for a few seconds. When I remove the partition ActorSystemA and
ActorSystemB reconnects to each other fine, but ActorSystemC gets
quarantined by ActorSystemA. I get the following in the log:
[ERROR] [02/11/2015 09:14:33.059]
[myactorsystem-akka.remote.default-remote-dispatcher-7] [Remoting]
Association to [akka.tcp://application@ip:port] with UID [-1690961114]
irrecoverably failed. Quarantining address.
java.lang.IllegalStateException: Error encountered while processing system
message acknowledgement buffer: [-1 {}] ack: ACK[3, {}]
at
akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:287)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at
akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:188)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Highest SEQ so far was -1
but cumulative ACK is 3
at akka.remote.AckedSendBuffer.acknowledge(AckedDelivery.scala:103)
at
akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:283)
... 12 more
[INFO] [02/11/2015 09:14:33.740]
[myactorsystem-akka.remote.default-remote-dispatcher-7] [Remoting]
Quarantined address [akka.tcp://application@ip2:port2] is still unreachable
or has not been restarted. Keeping it quarantined.
Attached is the scaled-down version with configs I've used to reproduce the
test. I've used Akka 2.3.9 and deployed it in Amazon EC2 and fabricated a
network partition using iptables.
In the full system I also have seen the following:
17:45:13.524UTC ERROR[reactor-akka.actor.default-dispatcher-2] Remoting -
Association to [akka.tcp://myactorsystem@ip:port] with UID [1330505507]
irrecoverably failed. Quarantining address.
java.util.concurrent.TimeoutException: Delivery of system messages timed
out and they were dropped.
This doesn't seem right. It's fair when I put excessive load on the system
that I get some buffer overflow, but going quarantined after a few seconds
of network partition isn't. This doesn't happen if I don't use
publish-subscribe which leads me to think publish-subscribe use system
messaging as well, is that right? Also I've struck into quarantine problems
quite a few times now so I'm having a hard time to know how I should tackle
them. First I would need to get it in shape so it wouldn't ever happen
under somewhat normal circumstances. In the rare cases it would happen, is
there some event or something I can listen on so I can shutdown properly or
do appropriate actions, right now all I get are some log messages?
So far I know about remote death watching, remote actor deployment and
publish-subscribe that uses system messages that can cause quarantine, what
else is there?
I'm happy to file a bug report if you think that's appropriate for this
case.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
# This is the main configuration file for the application.
# ~~~~~
# Secret key
# ~~~~~
# The secret key is used to secure cryptographics functions.
# If you deploy your application to several instances be sure to use the same
key!
application.secret=">S7PM0Fx7QYQUysMC5hBBGFFpr@oE:6TJN^utWNP3Id^tAES<iVb@MK^PYNSIveL"
# The application languages
# ~~~~~
application.langs="en"
# Global object class
# ~~~~~
# Define the Global object class for this application.
# Default to Global in the root package.
# application.global=Global
# Router
# ~~~~~
# Define the Router object to use for this application.
# This router will be looked up first when the application is starting up,
# so make sure this is the entry point.
# Furthermore, it's assumed your route file is named properly.
# So for an application router like `conf/my.application.Router`,
# you may need to define a router file
`my.application.discussionsModule.projectModule.routes`.
# Default to Routes in the root package (and
`conf/discussionsModule.projectModule.routes`)
# application.router=my.application.Routes
# Logger
# ~~~~~
# You can also configure logback (http://logback.qos.ch/),
# by providing an application-logger.xml file in the conf directory.
# Root logger:
logger.root=ERROR
# Logger used by the framework:
logger.play=INFO
# Logger provided to your application:
logger.application=DEBUG
myactorsystem {
akka.actor.debug.receive = true
akka {
loglevel="INFO"
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
remote {
log-remote-lifecycle-events = off
log-received-messages = on
netty.tcp {
hostname = "{{ ipB }}"
port = 27873
}
}
cluster {
roles = [play_dashboard-1_0]
seed-nodes = [
"akka.tcp://myactorsystem@{{ ipA }}:2000"]
auto-down-unreachable-after = off
}
}
}
# Config for the application actor system
akka.actor.debug.receive = true
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
log-received-messages = on
netty.tcp {
hostname="{{ ipB }}"
port = 27874
}
}
}
akka {
loglevel="INFO"
#loggers = ["akka.event.slf4j.Slf4jLogger"]
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "{{ ipA }}"
port = 2000
}
}
cluster {
auto-down-unreachable-after = off
roles = ["ServerA"]
seed-nodes = ["akka.tcp://myactorsystem@{{ ipA }}:2000"]
}
}
package test;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;
public class ServerA {
public static void main(String[] args) {
ActorSystem actor_system_a = ActorSystem.create("myactorsystem", ConfigFactory.load("ServerA"));
actor_system_a.actorOf(Props.create(ServerActor.class), "actor");
}
public static class ServerActor extends UntypedActor {
private ActorRef actorRef = null;
public void preStart() {
FiniteDuration interval_duration = Duration.create(1000, TimeUnit.MILLISECONDS);
context().system().scheduler().schedule(interval_duration, interval_duration, new Runnable() {
@Override
public void run() {
if (actorRef != null) actorRef.tell("pong", self());
ActorRef mediator = DistributedPubSubExtension.get(context().system()).mediator();
mediator.tell(new DistributedPubSubMediator.Publish("topic", "pang"), self());
}
}, context().system().dispatcher());
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof ActorRef) {
actorRef = ((ActorRef)message);
actorRef.tell("ping", self());
ActorRef mediator = DistributedPubSubExtension.get(context().system()).mediator();
mediator.tell(new DistributedPubSubMediator.Subscribe("topic", actorRef), actorRef);
}
}
}
}
package test;
import java.util.List;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.cluster.routing.ClusterRouterGroup;
import akka.cluster.routing.ClusterRouterGroupSettings;
import akka.routing.RoundRobinGroup;
import com.google.common.collect.Lists;
import com.typesafe.config.ConfigFactory;
public class ServerB {
private static ActorSystem actorSystemB;
private static ActorSystem actorSystemC;
private static ActorRef clusterAwareRouter;
public static void main(String[] args) {
actorSystemB = ActorSystem.create("application", ConfigFactory.load("ServerB"));
actorSystemB.actorOf(Props.create(NonClusterActor.class));
actorSystemC = ActorSystem.create("myactorsystem", ConfigFactory.load("ServerB").getConfig("myactorsystem"));
List<String> paths = Lists.newArrayList("/user/actor");
ClusterRouterGroup router_group = new ClusterRouterGroup(new RoundRobinGroup(paths), new ClusterRouterGroupSettings(100, paths, false, "ServerA"));
clusterAwareRouter = actorSystemC.actorOf(router_group.props());
}
private static class NonClusterActor extends UntypedActor {
public void preStart() {
FiniteDuration interval_duration = Duration.create(1000, TimeUnit.MILLISECONDS);
context().system().scheduler().schedule(Duration.create(10000, TimeUnit.MILLISECONDS), interval_duration, new Runnable() {
@Override
public void run() {
System.out.println("Sending message");
clusterAwareRouter.tell(getSelf(), getSelf());
}
}, context().system().dispatcher());
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
System.out.println("NonClusterActor message: " + message);
}
}
}
}