Hi! I've had some problems handling temporary network 
partitions/communication problems which causes nodes to get quarantined. 
Earlier I've found out that I can hit the system message buffer size; 
https://groups.google.com/d/topic/akka-user/NGLi9GTZ42o/discussion which 
causes quarantine. In the earlier case it was due to mass-death/creation of 
remotely watched actors.

Now I have struck into another use case where I get quarantined nodes, this 
time not because of excessive load but instead because of network partition 
and usage of publish-subscribe. My use case isn't maybe the most normal but 
let me try to explain it anyway;

I have one node running my own java-akka process (ServerA) and one node 
running play framework (ServerB). In ServerA I've called the actor system 
"myactorsystem"(ActorSystemA). At first I tried to make the internal actor 
system(ActorSystemB) in play framework to connect to the same cluster as 
ActorSystemA but that didn't work because apparently Play Framework has the 
actor system name of "application" that can't be changed. I ended up 
creating a separate actor system(ActorSystemC) in my play application with 
the name "myactorsystem" and connect that to the cluster and that seemed to 
work fine. I use WebSockets in the play framework which gets represented by 
an actor in ActorSystemB. This websocket actor gets sent to an actor in 
ActorSystemA, through a cluster aware router in ActorSystemC, which 
subscribes it to a topic using Publish-Subscribe.

It all runs fine until I introduce a network partition between the nodes 
for a few seconds. When I remove the partition ActorSystemA and 
ActorSystemB reconnects to each other fine, but ActorSystemC gets 
quarantined by ActorSystemA. I get the following in the log:

[ERROR] [02/11/2015 09:14:33.059] [myactorsystem-akka.remote.
default-remote-dispatcher-7] [Remoting] Association to 
[akka.tcp://application@ip:port] with UID [-1690961114] irrecoverably 
failed. Quarantining address.
java.lang.IllegalStateException: Error encountered while processing system 
message acknowledgement buffer: [-1 {}] ack: ACK[3, {}]
    at 
akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:287)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at 
akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:188)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
    at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
    at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Highest SEQ so far was -1 
but cumulative ACK is 3
    at akka.remote.AckedSendBuffer.acknowledge(AckedDelivery.scala:103)
    at 
akka.remote.ReliableDeliverySupervisor$$anonfun$receive$1.applyOrElse(Endpoint.scala:283)
    ... 12 more

[INFO] [02/11/2015 09:14:33.740] 
[myactorsystem-akka.remote.default-remote-dispatcher-7] [Remoting] 
Quarantined address [akka.tcp://application@ip2:port2] is still unreachable 
or has not been restarted. Keeping it quarantined.

Attached is the scaled-down version with configs I've used to reproduce the 
test. I've used Akka 2.3.9 and deployed it in Amazon EC2 and fabricated a 
network partition using iptables.


In the full system I also have seen the following:

17:45:13.524UTC ERROR[reactor-akka.actor.default-dispatcher-2] Remoting - 
Association to [akka.tcp://myactorsystem@ip:port] with UID [1330505507] 
irrecoverably failed. Quarantining address.
java.util.concurrent.TimeoutException: Delivery of system messages timed 
out and they were dropped.


This doesn't seem right. It's fair when I put excessive load on the system 
that I get some buffer overflow, but going quarantined after a few seconds 
of network partition isn't. This doesn't happen if I don't use 
publish-subscribe which leads me to think publish-subscribe use system 
messaging as well, is that right? Also I've struck into quarantine problems 
quite a few times now so I'm having a hard time to know how I should tackle 
them. First I would need to get it in shape so it wouldn't ever happen 
under somewhat normal circumstances. In the rare cases it would happen, is 
there some event or something I can listen on so I can shutdown properly or 
do appropriate actions, right now all I get are some log messages?

So far I know about remote death watching, remote actor deployment and 
publish-subscribe that uses system messages that can cause quarantine, what 
else is there?

I'm happy to file a bug report if you think that's appropriate for this 
case.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
package test;

import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import com.typesafe.config.ConfigFactory;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;

public class ServerA {

	public static void main(String[] args) {
		ActorSystem actor_system_a = ActorSystem.create("myactorsystem", ConfigFactory.load("ServerA"));
		actor_system_a.actorOf(Props.create(ServerActor.class), "actor");
	}
	
	public static class ServerActor extends UntypedActor {
		
		private ActorRef actorRef = null;
		
		public void preStart() {
			FiniteDuration interval_duration = Duration.create(1000, TimeUnit.MILLISECONDS);
			context().system().scheduler().schedule(interval_duration, interval_duration, new Runnable() {

				@Override
				public void run() {
					if (actorRef != null) actorRef.tell("pong", self());
					ActorRef mediator = DistributedPubSubExtension.get(context().system()).mediator();
					mediator.tell(new DistributedPubSubMediator.Publish("topic", "pang"), self());
				}
				
			}, context().system().dispatcher());
		}

		@Override
		public void onReceive(Object message) throws Exception {
			if (message instanceof ActorRef) {
				actorRef = ((ActorRef)message);
				actorRef.tell("ping", self());
				
				ActorRef mediator = DistributedPubSubExtension.get(context().system()).mediator();
				mediator.tell(new DistributedPubSubMediator.Subscribe("topic", actorRef), actorRef);
			}
		}
		
	}
}
package test;

import java.util.List;
import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.cluster.routing.ClusterRouterGroup;
import akka.cluster.routing.ClusterRouterGroupSettings;
import akka.routing.RoundRobinGroup;

import com.google.common.collect.Lists;
import com.typesafe.config.ConfigFactory;

public class ServerB {
	
	private static ActorSystem actorSystemB;
	private static ActorSystem actorSystemC;
	
	private static ActorRef clusterAwareRouter;
	
	public static void main(String[] args) {
		actorSystemB = ActorSystem.create("application", ConfigFactory.load("ServerB"));
		actorSystemB.actorOf(Props.create(NonClusterActor.class));
		actorSystemC = ActorSystem.create("myactorsystem", ConfigFactory.load("ServerB").getConfig("myactorsystem"));
		
		List<String> paths = Lists.newArrayList("/user/actor");
		ClusterRouterGroup router_group = new ClusterRouterGroup(new RoundRobinGroup(paths), new ClusterRouterGroupSettings(100, paths, false, "ServerA"));
		clusterAwareRouter = actorSystemC.actorOf(router_group.props());

	}
	
	private static class NonClusterActor extends UntypedActor {
		
		public void preStart() {
			FiniteDuration interval_duration = Duration.create(1000, TimeUnit.MILLISECONDS);
			context().system().scheduler().schedule(Duration.create(10000, TimeUnit.MILLISECONDS), interval_duration, new Runnable() {

				@Override
				public void run() {
					System.out.println("Sending message");
					clusterAwareRouter.tell(getSelf(), getSelf());
				}
				
			}, context().system().dispatcher());
		}

		@Override
		public void onReceive(Object message) throws Exception {
			if (message instanceof String) {
				System.out.println("NonClusterActor message: " + message);
			}
		}
		
	}
}
# This is the main configuration file for the application.
# ~~~~~

# Secret key
# ~~~~~
# The secret key is used to secure cryptographics functions.
# If you deploy your application to several instances be sure to use the same 
key!
application.secret=""

# The application languages
# ~~~~~
application.langs="en"

# Global object class
# ~~~~~
# Define the Global object class for this application.
# Default to Global in the root package.
# application.global=Global

# Router
# ~~~~~
# Define the Router object to use for this application.
# This router will be looked up first when the application is starting up,
# so make sure this is the entry point.
# Furthermore, it's assumed your route file is named properly.
# So for an application router like `conf/my.application.Router`,
# you may need to define a router file 
`my.application.discussionsModule.projectModule.routes`.
# Default to Routes in the root package (and 
`conf/discussionsModule.projectModule.routes`)
# application.router=my.application.Routes

# Logger
# ~~~~~
# You can also configure logback (http://logback.qos.ch/),
# by providing an application-logger.xml file in the conf directory.

# Root logger:
logger.root=ERROR

# Logger used by the framework:
logger.play=INFO

# Logger provided to your application:
logger.application=DEBUG

myactorsystem {
  akka.actor.debug.receive = true
  akka {
  loglevel="INFO"

    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }

        extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]

    remote {
      log-remote-lifecycle-events = off
      log-received-messages = on
      netty.tcp {
        hostname = "{{ ipB }}"
        port = 27873
      }
    }

    cluster {
      roles = [play_dashboard-1_0]
      
      seed-nodes = [
        "akka.tcp://myactorsystem@{{ ipA }}:2000"]

      auto-down-unreachable-after = off
    }
  }
}

# Config for the application actor system
akka.actor.debug.receive = true
  akka {

    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }

    remote {
      log-remote-lifecycle-events = off
      log-received-messages = on
      netty.tcp {
        hostname="{{ ipB }}"
        port = 27874
      }
    }
  }
akka {
        loglevel="INFO"
        #loggers = ["akka.event.slf4j.Slf4jLogger"]
        
        actor {
                provider = "akka.cluster.ClusterActorRefProvider"
    }
    remote {
    log-remote-lifecycle-events = off
    netty.tcp {
    hostname = "{{ ipA }}"
    port = 2000
    }
    }
     
    cluster {     
        auto-down-unreachable-after = off
        roles = ["ServerA"]
        seed-nodes = ["akka.tcp://myactorsystem@{{ ipA }}:2000"]
    }
}

Reply via email to