i am doing cluster example from site : 
https://github.com/akka/akka-samples/tree/2.5/akka-sample-cluster-java
while i am executing the cluster it shows following error can anyone solve 
this error. 

Error is like this 
//=========================Main 1=================================
//=========================Main 2=================================
[DEBUG] [07/24/2017 15:49:45.072] [main] 
[EventStream(akka://ClusterSystem)] logger log1-Logging$DefaultLogger 
started
[DEBUG] [07/24/2017 15:49:45.088] [main] 
[EventStream(akka://ClusterSystem)] Default Loggers started
[INFO] [07/24/2017 15:49:45.727] [main] [akka.remote.Remoting] Starting 
remoting
[INFO] [07/24/2017 15:49:46.163] [main] [akka.remote.Remoting] Remoting 
started; listening on addresses :[akka.tcp://[email protected]:1170]
[INFO] [07/24/2017 15:49:46.195] [main] [akka.remote.Remoting] Remoting now 
listens on addresses: [akka.tcp://[email protected]:1170]
[INFO] [07/24/2017 15:49:46.210] [main] 
[akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://[email protected]:1170] - Starting up...
[INFO] [07/24/2017 15:49:46.475] [main] 
[akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://[email protected]:1170] - Registered cluster JMX MBean 
[akka:type=Cluster]
[INFO] [07/24/2017 15:49:46.475] [main] 
[akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://[email protected]:1170] - Started up successfully
//=========================Main 3=================================
[DEBUG] [07/24/2017 15:49:46.600] 
[ClusterSystem-akka.actor.default-dispatcher-18] 
[akka.tcp://[email protected]:1170/system/cluster-metrics] Collection 
started.
//=========================Main 4=================================
//========================= CLUSTER INVOKE PRE START METHOD 
=================================
//========================= CLUSTER IS STARTED 
=================================
[DEBUG] [07/24/2017 15:49:46.615] 
[ClusterSystem-akka.actor.default-dispatcher-16] 
[akka.cluster.metrics.MetricsCollector$(akka://ClusterSystem)] Trying 
akka.cluster.metrics.SigarMetricsCollector.
[DEBUG] [07/24/2017 15:49:46.615] 
[ClusterSystem-akka.actor.default-dispatcher-16] 
[akka.cluster.metrics.MetricsCollector$(akka://ClusterSystem)] Trying 
akka.cluster.metrics.JmxMetricsCollector.
[INFO] [07/24/2017 15:49:46.631] 
[ClusterSystem-akka.actor.default-dispatcher-16] 
[akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node 
[akka.tcp://[email protected]:1170] - Metrics collection has started 
successfully
[WARN] [07/24/2017 15:49:46.662] 
[ClusterSystem-akka.actor.default-dispatcher-3] 
[akka.tcp://[email protected]:1170/system/cluster/core/daemon/downingProvider]
 
Don't use auto-down feature of Akka Cluster in production. See 
'Auto-downing (DO NOT USE)' section of Akka Cluster documentation.
//========================= GOING INTO CURRENT CLUSTER STATE 
=================================
//========================= CURRENT CLUSTER STATE SUCCESS 
=================================CurrentClusterState(TreeSet(),Set(),Set(),None,Map())
[WARN] [07/24/2017 15:49:56.746] 
[ClusterSystem-akka.actor.default-dispatcher-17] 
[akka.tcp://[email protected]:1170/system/cluster/core/daemon/joinSeedNodeProcess-1]
 
Couldn't join seed nodes after [2] attempts, will try again. 
seed-nodes=[akka.tcp://[email protected]:1170, 
akka.tcp://[email protected]:1170]
[WARN] [07/24/2017 15:50:01.765] 
[ClusterSystem-akka.actor.default-dispatcher-15] 
[akka.tcp://[email protected]:1170/system/cluster/core/daemon/joinSeedNodeProcess-1]
 
Couldn't join seed nodes after [3] attempts, will try again. 
seed-nodes=[akka.tcp://[email protected]:1170, 
akka.tcp://[email protected]:1170]
[INFO] [07/24/2017 15:50:01.812] 
[ClusterSystem-akka.remote.default-remote-dispatcher-6] 
[akka.tcp://[email protected]:1170/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1]
 
No response from remote for outbound association. Associate timed out after 
[15000 ms].
[INFO] [07/24/2017 15:50:01.827] 
[ClusterSystem-akka.remote.default-remote-dispatcher-5] 
[akka.tcp://[email protected]:1170/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-2]
 
No response from remote for outbound association. Associate timed out after 
[15000 ms].
[WARN] [07/24/2017 15:50:01.905] [New I/O boss #3] 
[NettyTransport(akka://ClusterSystem)] Remote connection to null failed 
with org.jboss.netty.channel.ConnectTimeoutException: connection timed out: 
/127.0.0.3:1170
[WARN] [07/24/2017 15:50:01.905] [New I/O boss #3] 
[NettyTransport(akka://ClusterSystem)] Remote connection to null failed 
with org.jboss.netty.channel.ConnectTimeoutException: connection timed out: 
/127.0.0.2:1170
[DEBUG] [07/24/2017 15:50:01.921] 
[ClusterSystem-akka.remote.default-remote-dispatcher-6] 
[akka.tcp://[email protected]:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1/endpointWriter]
 
AssociationError [akka.tcp://[email protected]:1170] -> 
[akka.tcp://[email protected]:1170]: Error [Association failed with 
[akka.tcp://[email protected]:1170]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://[email protected]:1170]
Caused by: java.util.concurrent.TimeoutException: No response from remote 
for outbound association. Associate timed out after [15000 ms].
    at 
akka.remote.transport.ProtocolStateActor$$anonfun$2.applyOrElse(AkkaProtocolTransport.scala:367)
    at 
akka.remote.transport.ProtocolStateActor$$anonfun$2.applyOrElse(AkkaProtocolTransport.scala:341)
    at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at akka.actor.FSM$class.processEvent(FSM.scala:663)
    at 
akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
    at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:657)
    at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:629)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:513)
    at 
akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:519)
    at akka.actor.ActorCell.invoke(ActorCell.scala:488)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
]
[DEBUG] [07/24/2017 15:50:01.921] 
[ClusterSystem-akka.remote.default-remote-dispatcher-5] 
[akka.tcp://[email protected]:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0/endpointWriter]
 
AssociationError [akka.tcp://[email protected]:1170] -> 
[akka.tcp://[email protected]:1170]: Error [Association failed with 
[akka.tcp://[email protected]:1170]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://[email protected]:1170]
Caused by: java.util.concurrent.TimeoutException: No response from remote 
for outbound association. Associate timed out after [15000 ms].
    at 
akka.remote.transport.ProtocolStateActor$$anonfun$2.applyOrElse(AkkaProtocolTransport.scala:367)
    at 
akka.remote.transport.ProtocolStateActor$$anonfun$2.applyOrElse(AkkaProtocolTransport.scala:341)
    at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at akka.actor.FSM$class.processEvent(FSM.scala:663)
    at 
akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:286)
    at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:657)
    at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:629)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:513)
    at 
akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:286)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:519)
    at akka.actor.ActorCell.invoke(ActorCell.scala:488)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
]
[WARN] [07/24/2017 15:50:01.921] 
[ClusterSystem-akka.remote.default-remote-dispatcher-6] 
[akka.tcp://[email protected]:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1]
 
Association with remote system [akka.tcp://[email protected]:1170] 
has failed, address is now gated for [5000] ms. Reason: [Association failed 
with [akka.tcp://[email protected]:1170]] Caused by: [No response 
from remote for outbound association. Associate timed out after [15000 ms].]
[INFO] [07/24/2017 15:50:01.921] 
[ClusterSystem-akka.actor.default-dispatcher-2] 
[akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1]
 
Message [akka.actor.Status$Failure] from 
Actor[akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1#-1283183146]
 
to 
Actor[akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1#-1283183146]
 
was not delivered. [1] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[WARN] [07/24/2017 15:50:01.921] 
[ClusterSystem-akka.remote.default-remote-dispatcher-5] 
[akka.tcp://[email protected]:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0]
 
Association with remote system [akka.tcp://[email protected]:1170] 
has failed, address is now gated for [5000] ms. Reason: [Association failed 
with [akka.tcp://[email protected]:1170]] Caused by: [No response 
from remote for outbound association. Associate timed out after [15000 ms].]
[INFO] [07/24/2017 15:50:01.921] 
[ClusterSystem-akka.actor.default-dispatcher-18] 
[akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-2]
 
Message [akka.actor.Status$Failure] from 
Actor[akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-2#-987867521]
 
to 
Actor[akka://ClusterSystem/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-2#-987867521]
 
was not delivered. [2] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [07/24/2017 15:50:01.968] 
[ClusterSystem-akka.actor.default-dispatcher-15] 
[akka://ClusterSystem/deadLetters] Message 
[akka.cluster.InternalClusterAction$InitJoin$] from 
Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083]
 
to Actor[akka://ClusterSystem/deadLetters] was not delivered. [3] dead 
letters encountered. This logging can be turned off or adjusted with 
configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [07/24/2017 15:50:01.968] 
[ClusterSystem-akka.actor.default-dispatcher-15] 
[akka://ClusterSystem/deadLetters] Message 
[akka.cluster.InternalClusterAction$InitJoin$] from 
Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083]
 
to Actor[akka://ClusterSystem/deadLetters] was not delivered. [4] dead 
letters encountered. This logging can be turned off or adjusted with 
configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [07/24/2017 15:50:01.968] 
[ClusterSystem-akka.actor.default-dispatcher-18] 
[akka://ClusterSystem/deadLetters] Message 
[akka.cluster.InternalClusterAction$InitJoin$] from 
Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083]
 
to Actor[akka://ClusterSystem/deadLetters] was not delivered. [5] dead 
letters encountered. This logging can be turned off or adjusted with 
configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [07/24/2017 15:50:01.968] 
[ClusterSystem-akka.actor.default-dispatcher-18] 
[akka://ClusterSystem/deadLetters] Message 
[akka.cluster.InternalClusterAction$InitJoin$] from 
Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083]
 
to Actor[akka://ClusterSystem/deadLetters] was not delivered. [6] dead 
letters encountered. This logging can be turned off or adjusted with 
configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[DEBUG] [07/24/2017 15:50:01.968] 
[ClusterSystem-akka.remote.default-remote-dispatcher-6] 
[akka.tcp://[email protected]:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0/endpointWriter]
 
Disassociated [akka.tcp://[email protected]:1170] -> 
[akka.tcp://[email protected]:1170]
[INFO] [07/24/2017 15:50:01.968] 
[ClusterSystem-akka.actor.default-dispatcher-2] 
[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0/endpointWriter]
 
Message [akka.remote.EndpointWriter$AckIdleCheckTimer$] from 
Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0/endpointWriter#1576329271]
 
to 
Actor[akka://ClusterSystem/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.2%3A1170-0/endpointWriter#1576329271]
 
was not delivered. [7] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[DEBUG] [07/24/2017 15:50:01.983] 
[ClusterSystem-akka.remote.default-remote-dispatcher-5] 
[akka.tcp://[email protected]:1170/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FClusterSystem%40127.0.0.3%3A1170-1/endpointWriter]
 
Disassociated [akka.tcp://[email protected]:1170] -> 
[akka.tcp://[email protected]:1170]
[INFO] [07/24/2017 15:50:01.983] 
[ClusterSystem-akka.actor.default-dispatcher-15] 
[akka://ClusterSystem/deadLetters] Message 
[akka.cluster.InternalClusterAction$InitJoin$] from 
Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083]
 
to Actor[akka://ClusterSystem/deadLetters] was not delivered. [8] dead 
letters encountered. This logging can be turned off or adjusted with 
configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [07/24/2017 15:50:01.983] 
[ClusterSystem-akka.actor.default-dispatcher-15] 
[akka://ClusterSystem/deadLetters] Message 
[akka.cluster.InternalClusterAction$InitJoin$] from 
Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083]
 
to Actor[akka://ClusterSystem/deadLetters] was not delivered. [9] dead 
letters encountered. This logging can be turned off or adjusted with 
configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.
[INFO] [07/24/2017 15:50:01.983] 
[ClusterSystem-akka.actor.default-dispatcher-15] 
[akka://ClusterSystem/deadLetters] Message 
[akka.cluster.InternalClusterAction$InitJoin$] from 
Actor[akka://ClusterSystem/system/cluster/core/daemon/joinSeedNodeProcess-1#1139391083]
 
to Actor[akka://ClusterSystem/deadLetters] was not delivered. [10] dead 
letters encountered, no more dead letters will be logged. This logging can 
be turned off or adjusted with configuration settings 
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.



and tell me how to execute node i am confusing while executing nodes.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
package sample.cluster.transformation;

import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
//import akka.cluster.metrics.ClusterMetricsExtension;
//import sample.cluster.transformation.TransformationMessages.TransformationJob;
//import sample.cluster.transformation.TransformationMessages.TransformationResult;
//import static sample.cluster.transformation.TransformationMessages.BACKEND_REGISTRATION;

//#backend
@SuppressWarnings("deprecation")
public class TransformationBackend extends UntypedActor implements TransformationMessages{

  Cluster cluster = Cluster.get(getContext().system());

  //subscribe to cluster changes, MemberUp
  @Override
  public void preStart() {
	  System.out.println("//========================= CLUSTER INVOKE PRE START METHOD =================================");
    cluster.subscribe(getSelf(), MemberUp.class);
    System.out.println("//========================= CLUSTER IS STARTED =================================");
  }

  //re-subscribe when restart
  @Override
  public void postStop() {
    cluster.unsubscribe(getSelf());
    System.out.println("//========================= CLUSTER STOPED.....! =================================");
  }

  @Override
  public void onReceive(Object message) {
    if (message instanceof TransformationJob) {
    	 System.out.println("//========================= GOING INTO TRANSFORMATION JOB =================================");
      TransformationJob job = (TransformationJob) message;
      getSender().tell(new TransformationResult(job.getText().toUpperCase()),
          getSelf());
      System.out.println("//========================= TRANSFORMATION JOB SUCCESS.... =================================");
    } else if (message instanceof CurrentClusterState) {
    	 System.out.println("//========================= GOING INTO CURRENT CLUSTER STATE =================================");
      CurrentClusterState state = (CurrentClusterState) message;
      System.out.println("//========================= CURRENT CLUSTER STATE SUCCESS ================================="+state);
      for (Member member : state.getMembers()) {
        if (member.status().equals(MemberStatus.up())) {
        	 System.out.println("//========================= MEMBER IS UP =================================");
          register(member);
          System.out.println("//========================= MEMBERS ARE UP ================================="+member);
        }
      }

    } else if (message instanceof MemberUp) {
    	 System.out.println("//========================= MEMBERS..... =================================");
      MemberUp mUp = (MemberUp) message;
      register(mUp.member());
      System.out.println("//========================= MEMBERS UP REGISTER =================================");
    } else {
      unhandled(message);
      System.out.println("//========================= ERROR IN MEMBERS =================================");
    }
     }

  void register(Member member) {
	  System.out.println("//========================= REGISTER MEMBER =================================");
    if (member.hasRole("frontend"))
      getContext().actorSelection(member.address() + "/user/frontend").tell(
          BACKEND_REGISTRATION, getSelf());
  }
  }//#backend
package sample.cluster.transformation;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
public class TransformationBackendMain {
    public static void main(String[] args) {
        // Override the configuration of the port when specified as program argument
    	
    	System.out.println("//=========================Main 1=================================");
        final String hostname = args.length > 0 ? args[0] : "127.0.0.1";
        final String port = args.length > 1 ? args[1] : "1170";
        final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + hostname).
                withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)).
                withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
                withFallback(ConfigFactory.load());
        System.out.println("//=========================Main 2=================================");
        ActorSystem system = ActorSystem.create("ClusterSystem", config);
        System.out.println("//=========================Main 3=================================");
        system.actorOf(Props.create(TransformationBackend.class), "backend");
        System.out.println("//=========================Main 4=================================");
    }}
package sample.cluster.transformation;

import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import sample.cluster.transformation.TransformationMessages.JobFailed;
import sample.cluster.transformation.TransformationMessages.TransformationJob;

import java.util.ArrayList;
import java.util.List;

import static sample.cluster.transformation.TransformationMessages.BACKEND_REGISTRATION;

//#frontend
public class TransformationFrontend extends UntypedActor {

  List<ActorRef> backends = new ArrayList<ActorRef>();
  int jobCounter = 0;

  @Override
  public void onReceive(Object message) {
    if ((message instanceof TransformationJob) && backends.isEmpty()) {
      TransformationJob job = (TransformationJob) message;
      getSender().tell(
          new JobFailed("Service unavailable, try again later", job),
          getSender());

    } else if (message instanceof TransformationJob) {
      TransformationJob job = (TransformationJob) message;
      jobCounter++;
      backends.get(jobCounter % backends.size())
          .forward(job, getContext());

    } else if (message.equals(BACKEND_REGISTRATION)) {
      getContext().watch(getSender());
      backends.add(getSender());

    } else if (message instanceof Terminated) {
      Terminated terminated = (Terminated) message;
      backends.remove(terminated.getActor());

    } else {
      unhandled(message);
    }
  }

}
//#frontend
package sample.cluster.transformation;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.OnSuccess;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import sample.cluster.transformation.TransformationMessages.TransformationJob;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class TransformationFrontendMain {

    public static void main(String[] args) {
        // Override the configuration of the port when specified as program argument
        final String hostname = args.length > 0 ? args[0] : "127.0.0.1";
        final String port = args.length > 1 ? args[1] : "1170";

        final Config config = ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + hostname).
                withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)).
                withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
                withFallback(ConfigFactory.load());

        ActorSystem system = ActorSystem.create("ClusterSystem", config);

        
		final ActorRef frontend = system.actorOf(
                Props.create(TransformationFrontend.class), "frontend");
        final FiniteDuration interval = Duration.create(2, TimeUnit.SECONDS);
        final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS));
        final ExecutionContext ec = system.dispatcher();
        final AtomicInteger counter = new AtomicInteger();
        system.scheduler().schedule(interval, interval, new Runnable() {
            public void run() {
                Patterns.ask(frontend,
                        new TransformationJob("hello-" + counter.incrementAndGet()),
                        timeout).onSuccess(new OnSuccess<Object>() {
                    public void onSuccess(Object result) {
                        System.out.println(result);
                    }
                }, ec);
            }

        }, ec);

    }
}
package sample.cluster.transformation;

import java.io.Serializable;

//#messages
public interface TransformationMessages {

  public static class TransformationJob implements Serializable {
   
	private static final long serialVersionUID = 1L;
	private final String text;

    public TransformationJob(String text) {
      this.text = text;
    }

    public String getText() {
      return text;
    }
    
    @Override
    public String toString() {
      return "TransformationJob(" + text + ")";
    }
  }

  public static class TransformationResult implements Serializable {
   
	private static final long serialVersionUID = 1L;
	private final String text;

    public TransformationResult(String text) {
      this.text = text;
    }

    public String getText() {
      return text;
    }

    @Override
    public String toString() {
      return "TransformationResult(" + text + ")";
    }
  }

  public static class JobFailed implements Serializable {
  
	private static final long serialVersionUID = 1L;
	private final String reason;
    private final TransformationJob job;

    public JobFailed(String reason, TransformationJob job) {
      this.reason = reason;
      this.job = job;
    }

    public String getReason() {
      return reason;
    }

    public TransformationJob getJob() {
      return job;
    }

    @Override
    public String toString() {
      return "JobFailed(" + reason + ")";
    }
  }

  public static final String BACKEND_REGISTRATION = "BackendRegistration";

}
//#messages
akka {
  loglevel = "DEBUG"
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = DEBUG
    
   # heartbeat-interval = 5s   # default 4s
   # acceptable-heartbeat-pause = 10s  # default 10s
    
    
    netty.tcp {
      hostname = "127.0.0.1"
      port = 1170
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:1170",
      "akka.tcp://[email protected]:1170"]
                 # roles = ["role"]
        auto-down-unreachable-after = 280s
  }
}

# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off

# Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]

# Sigar native library extract location during tests.
# Note: use per-jvm-instance folder when running multiple jvm on one host.
akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native
include "backend"

akka.cluster.min-nr-of-members = 2

akka.cluster.role {
  watson.min-nr-of-members = 2
}

akka.actor.deployment {
}
include "application"
 
akka {
  loglevel = "DEBUG"
}

Attachment: logback.xml
Description: XML document

Reply via email to