Hi,
I've been trying to test the cluster client feature on the the new akka
2.3.2 version, and i've been finding an 'abnormal' behavior partly because
on inexperience and partly on the shallow scenarios available to show it
practically.
Here's my 2-cents scenario to simulate the issue, kindly run and advice on
the same.
------------------------------------------------------------------------------------------------------------
1. Class : ClusterinProtocol.java
import java.io.Serializable;
public abstract interface ClusterinProtocol {
public static final class Ping implements Serializable {
public final int count;
public Ping(int count) {
this.count = count;
}
@Override
public String toString() {
return "Ping{count='" + count + "'}";
}
};
public static final class Pong implements Serializable {
public final int count;
public Pong(int count) {
this.count = count;
}
@Override
public String toString() {
return "Pong{count='" + count + "'}";
}
};
}
------------------------------------------------------------------------------------------------------------
2. Class : ClusterShardImpl.java
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberEvent;
import akka.cluster.ClusterEvent.MemberRemoved;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.ClusterEvent.UnreachableMember;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Procedure;
import akka.persistence.SnapshotOffer;
import akka.persistence.UntypedEventsourcedProcessor;
public class ClusterShardImpl extends UntypedEventsourcedProcessor {
private LoggingAdapter log = Logging.getLogger(getContext().system(), this);
private Cluster cluster = Cluster.get(getContext().system());
private PingTracker tracker = new PingTracker();
public int getPingRequests() {
return tracker.size();
}
@Override
public void preStart() {
try {
super.preStart();
} catch (Exception e) {
e.printStackTrace();
}
cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(),
MemberEvent.class, UnreachableMember.class);
}
@Override
public void postStop() {
super.postStop();
cluster.unsubscribe(getSelf());
}
public ClusterShardImpl() {
log.info("Starting...........");
}
@Override
public void onReceiveRecover(Object message) {
if (message instanceof SnapshotOffer) {
tracker = (PingTracker) ((SnapshotOffer) message).snapshot();
} else if (message instanceof ClusterinProtocol.Ping) {
tracker.update((ClusterinProtocol.Ping) message);
}
}
int count = 0;
@Override
public void onReceiveCommand(Object message) {
log.info("Server : {}", message);
if (message instanceof ClusterinProtocol.Ping) {
count++;
ClusterinProtocol.Ping ping = (ClusterinProtocol.Ping) message;
persist(ping, new Procedure<ClusterinProtocol.Ping>() {
@Override
public void apply(ClusterinProtocol.Ping msg) {
tracker.update(msg);
getContext().system().eventStream().publish(msg);
getSender().tell(new ClusterinProtocol.Pong(count),
getSelf());
log.info("Server Pong Response count : {}", count);
}
});
} else if (message.equals("snap")) {
saveSnapshot(tracker.pingList);
} else if (message.equals("print")) {
System.out.println(tracker.toString());
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
log.info("Member is Up: {}", mUp.member());
} else if (message instanceof UnreachableMember) {
UnreachableMember mUnreachable = (UnreachableMember) message;
log.info("Member is unreachable: {}", mUnreachable.member());
} else if (message instanceof MemberRemoved) {
MemberRemoved mRemoved = (MemberRemoved) message;
log.info("Member is removed: {}", mRemoved.member());
} else if (message instanceof MemberEvent) {
// ignore
} else
unhandled(message);
}
class PingTracker implements Serializable {
private final List<ClusterinProtocol.Ping> pingList;
PingTracker() {
this(new ArrayList<ClusterinProtocol.Ping>());
}
PingTracker(List<ClusterinProtocol.Ping> pingList) {
this.pingList = pingList;
}
PingTracker copy() {
return new PingTracker(new ArrayList<>(pingList));
}
void update(ClusterinProtocol.Ping ping) {
pingList.add(ping);
}
int size() {
return pingList.size();
}
@Override
public String toString() {
return pingList.toString();
}
}
}
-------------------------------------------------------------------------------------------------------------
3. Class : ClusterClientImpl.java
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.contrib.pattern.ClusterClient;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class ClusterClientImpl extends UntypedActor {
private LoggingAdapter log = Logging.getLogger(getContext().system(), this);
private final ActorRef clusterClient; // , channel;
public ClusterClientImpl(ActorRef clusterClient) {
this.clusterClient = clusterClient;
log.info("Starting...........");
}
int sent = 0, received = 0;
@Override
public void onReceive(Object message) {
if (message == Tick) {
sent++;
clusterClient.tell(new ClusterClient.Send("/user/clusterShard",
new ClusterinProtocol.Ping(sent)), getSelf());
} else if (message instanceof ClusterinProtocol.Pong) {
received++;
} else
unhandled(message);
log.info("Sent : " + sent + "\tReceived : " + received
+ " : Received message : [{}]", message);
}
public static final Object Tick = new Object() {
@Override
public String toString() {
return "Tick";
}
};
}
-------------------------------------------------------------------------------------------------------------
4. Class : Main.java
import java.util.HashSet;
import java.util.Set;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import akka.contrib.pattern.ClusterClient;
import akka.contrib.pattern.ClusterReceptionistExtension;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
public class Main {
public static void main(String[] args) {
startServer(new String[] { "2551", "2552" });
startClient();
}
private static void startServer(String[] ports) {
for (String port : ports) {
Config conf = ConfigFactory
.parseString(
"akka.actor.provider=akka.cluster.ClusterActorRefProvider\n"
+ "akka.cluster.log-remote-lifecycle-events=on\n"
+ "akka.cluster.seed-nodes=[\"akka.tcp://[email protected]:2551\", "
+ "\"akka.tcp://[email protected]:2552\"]\n"
+ "akka.cluster.auto-down-unreachable-after=10s\n"
+ "akka.remote.netty.tcp.hostname=127.0.0.1\n"
+ "akka.remote.netty.tcp.port="
+ port
+ "\ncontrib.cluster.receptionist.name = receptionist"
+ "\nakka.extensions=[akka.contrib.pattern.ClusterReceptionistExtension]")
.withFallback(ConfigFactory.load());
ActorSystem system = ActorSystem.create("ClusterServer", conf);
ActorRef actor = system.actorOf(
Props.create(ClusterShardImpl.class), "clusterShard");
ClusterReceptionistExtension.get(system).registerService(actor);
}
}
private static void startClient() {
Config conf = ConfigFactory.parseString(
"akka.actor.provider=akka.cluster.ClusterActorRefProvider")
// +
// "\nakka.extensions=[akka.contrib.pattern.ClusterReceptionistExtension]"
.withFallback(ConfigFactory.load());
ActorSystem system = ActorSystem.create("ClusterClient", conf);
Address clusterAddress = new Address("akka.tcp", "ClusterServer",
"127.0.0.1", 2551);
Set<ActorSelection> initialContacts = new HashSet<>();
initialContacts.add(system.actorSelection(clusterAddress
+ "/user/receptionist"));
ActorRef clusterClient = system.actorOf(ClusterClient
.defaultProps(initialContacts));
system.actorOf(Props.create(ClusterClientImpl.class, clusterClient),
"clusterClient");
}
}
-------------------------------------------------------------------------------------------------------------
--
kind regards,
Bett Benard K.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.