Hi Benard,

one detail that caught my eye is a configuration setting for receptionist
name in your name method. It should start with *akka*.

Documentation on this setting here:
http://doc.akka.io/docs/akka/snapshot/contrib/cluster-client.html#ClusterReceptionistExtension

If that is not the source of your problem, let me know what messages you
seem to be loosing.


On Mon, Apr 14, 2014 at 7:08 PM, benard bett <[email protected]> wrote:

> Hi,
>
> I've been trying to test the cluster client feature on the the new akka
> 2.3.2 version, and i've been finding an 'abnormal' behavior partly because
> on inexperience and partly on the shallow scenarios available to show it
> practically.
>
> Here's my 2-cents scenario to simulate the issue, kindly run and advice on
> the same.
>
>
> ------------------------------------------------------------------------------------------------------------
> 1. Class : ClusterinProtocol.java
>
> import java.io.Serializable;
>
> public abstract interface ClusterinProtocol {
>
> public static final class Ping implements Serializable {
>
> public final int count;
>
> public Ping(int count) {
> this.count = count;
>  }
>
> @Override
> public String toString() {
>  return "Ping{count='" + count + "'}";
> }
> };
>
> public static final class Pong implements Serializable {
>
> public final int count;
>
> public Pong(int count) {
> this.count = count;
> }
>
> @Override
> public String toString() {
> return "Pong{count='" + count + "'}";
>  }
> };
>
> }
>
>
>
>
>
> ------------------------------------------------------------------------------------------------------------
> 2. Class : ClusterShardImpl.java
>
> import java.io.Serializable;
> import java.util.ArrayList;
> import java.util.List;
>
> import akka.cluster.Cluster;
> import akka.cluster.ClusterEvent;
> import akka.cluster.ClusterEvent.MemberEvent;
> import akka.cluster.ClusterEvent.MemberRemoved;
> import akka.cluster.ClusterEvent.MemberUp;
> import akka.cluster.ClusterEvent.UnreachableMember;
> import akka.event.Logging;
> import akka.event.LoggingAdapter;
> import akka.japi.Procedure;
> import akka.persistence.SnapshotOffer;
> import akka.persistence.UntypedEventsourcedProcessor;
>
> public class ClusterShardImpl extends UntypedEventsourcedProcessor {
>
> private LoggingAdapter log = Logging.getLogger(getContext().system(),
> this);
> private Cluster cluster = Cluster.get(getContext().system());
>  private PingTracker tracker = new PingTracker();
>
> public int getPingRequests() {
> return tracker.size();
>
> }
>
> @Override
> public void preStart() {
>  try {
> super.preStart();
> } catch (Exception e) {
>  e.printStackTrace();
> }
> cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(),
>  MemberEvent.class, UnreachableMember.class);
> }
>
> @Override
>  public void postStop() {
> super.postStop();
> cluster.unsubscribe(getSelf());
>  }
>
> public ClusterShardImpl() {
> log.info("Starting...........");
>  }
>
> @Override
> public void onReceiveRecover(Object message) {
>  if (message instanceof SnapshotOffer) {
> tracker = (PingTracker) ((SnapshotOffer) message).snapshot();
>  } else if (message instanceof ClusterinProtocol.Ping) {
> tracker.update((ClusterinProtocol.Ping) message);
>  }
> }
>
> int count = 0;
>
>  @Override
> public void onReceiveCommand(Object message) {
> log.info("Server : {}", message);
>  if (message instanceof ClusterinProtocol.Ping) {
> count++;
> ClusterinProtocol.Ping ping = (ClusterinProtocol.Ping) message;
>  persist(ping, new Procedure<ClusterinProtocol.Ping>() {
> @Override
> public void apply(ClusterinProtocol.Ping msg) {
>  tracker.update(msg);
> getContext().system().eventStream().publish(msg);
> getSender().tell(new ClusterinProtocol.Pong(count),
>  getSelf());
> log.info("Server Pong Response count : {}", count);
>  }
> });
>
> } else if (message.equals("snap")) {
>  saveSnapshot(tracker.pingList);
> } else if (message.equals("print")) {
> System.out.println(tracker.toString());
>  } else if (message instanceof MemberUp) {
> MemberUp mUp = (MemberUp) message;
> log.info("Member is Up: {}", mUp.member());
>
> } else if (message instanceof UnreachableMember) {
> UnreachableMember mUnreachable = (UnreachableMember) message;
>  log.info("Member is unreachable: {}", mUnreachable.member());
>
> } else if (message instanceof MemberRemoved) {
>  MemberRemoved mRemoved = (MemberRemoved) message;
> log.info("Member is removed: {}", mRemoved.member());
>
> } else if (message instanceof MemberEvent) {
> // ignore
>
> } else
>  unhandled(message);
>
> }
>
> class PingTracker implements Serializable {
>  private final List<ClusterinProtocol.Ping> pingList;
>
> PingTracker() {
> this(new ArrayList<ClusterinProtocol.Ping>());
>  }
>
> PingTracker(List<ClusterinProtocol.Ping> pingList) {
> this.pingList = pingList;
>  }
>
> PingTracker copy() {
> return new PingTracker(new ArrayList<>(pingList));
>  }
>
> void update(ClusterinProtocol.Ping ping) {
> pingList.add(ping);
>  }
>
> int size() {
> return pingList.size();
>  }
>
> @Override
> public String toString() {
>  return pingList.toString();
> }
> }
>
> }
>
>
>
>
> -------------------------------------------------------------------------------------------------------------
> 3. Class : ClusterClientImpl.java
>
> import akka.actor.ActorRef;
> import akka.actor.UntypedActor;
> import akka.contrib.pattern.ClusterClient;
> import akka.event.Logging;
> import akka.event.LoggingAdapter;
>
> public class ClusterClientImpl extends UntypedActor {
>
> private LoggingAdapter log = Logging.getLogger(getContext().system(),
> this);
>  private final ActorRef clusterClient; // , channel;
>
> public ClusterClientImpl(ActorRef clusterClient) {
>  this.clusterClient = clusterClient;
>
> log.info("Starting...........");
>  }
>
> int sent = 0, received = 0;
>
> @Override
>  public void onReceive(Object message) {
> if (message == Tick) {
> sent++;
>  clusterClient.tell(new ClusterClient.Send("/user/clusterShard",
> new ClusterinProtocol.Ping(sent)), getSelf());
>
> } else if (message instanceof ClusterinProtocol.Pong) {
> received++;
>
> } else
>  unhandled(message);
>
> log.info("Sent : " + sent + "\tReceived : " + received
>  + " : Received message : [{}]", message);
>
> }
>
> public static final Object Tick = new Object() {
>
> @Override
> public String toString() {
> return "Tick";
>  }
> };
>
> }
>
>
>
>
>
> -------------------------------------------------------------------------------------------------------------
> 4. Class : Main.java
>
> import java.util.HashSet;
> import java.util.Set;
>
> import akka.actor.ActorRef;
> import akka.actor.ActorSelection;
> import akka.actor.ActorSystem;
> import akka.actor.Address;
> import akka.actor.Props;
> import akka.contrib.pattern.ClusterClient;
> import akka.contrib.pattern.ClusterReceptionistExtension;
>
> import com.typesafe.config.Config;
> import com.typesafe.config.ConfigFactory;
>
> public class Main {
>
> public static void main(String[] args) {
> startServer(new String[] { "2551", "2552" });
>
> startClient();
>
> }
>
> private static void startServer(String[] ports) {
>  for (String port : ports) {
> Config conf = ConfigFactory
> .parseString(
>  "akka.actor.provider=akka.cluster.ClusterActorRefProvider\n"
> + "akka.cluster.log-remote-lifecycle-events=on\n"
>  + "akka.cluster.seed-nodes=[\"akka.tcp://[email protected]:2551\",
> "
> + "\"akka.tcp://[email protected]:2552\"]\n"
>  + "akka.cluster.auto-down-unreachable-after=10s\n"
> + "akka.remote.netty.tcp.hostname=127.0.0.1\n"
>  + "akka.remote.netty.tcp.port="
> + port
> + "\ncontrib.cluster.receptionist.name = receptionist"
>  +
> "\nakka.extensions=[akka.contrib.pattern.ClusterReceptionistExtension]")
> .withFallback(ConfigFactory.load());
>
> ActorSystem system = ActorSystem.create("ClusterServer", conf);
> ActorRef actor = system.actorOf(
>  Props.create(ClusterShardImpl.class), "clusterShard");
> ClusterReceptionistExtension.get(system).registerService(actor);
>  }
> }
>
> private static void startClient() {
>  Config conf = ConfigFactory.parseString(
> "akka.actor.provider=akka.cluster.ClusterActorRefProvider")
>  // +
> // "\nakka.extensions=[akka.contrib.pattern.ClusterReceptionistExtension]"
> .withFallback(ConfigFactory.load());
>  ActorSystem system = ActorSystem.create("ClusterClient", conf);
>
> Address clusterAddress = new Address("akka.tcp", "ClusterServer",
>  "127.0.0.1", 2551);
> Set<ActorSelection> initialContacts = new HashSet<>();
> initialContacts.add(system.actorSelection(clusterAddress
>  + "/user/receptionist"));
> ActorRef clusterClient = system.actorOf(ClusterClient
> .defaultProps(initialContacts));
>  system.actorOf(Props.create(ClusterClientImpl.class, clusterClient),
> "clusterClient");
>
>  }
>
> }
>
>
>
> -------------------------------------------------------------------------------------------------------------
>
>
>
> --
> kind regards,
> Bett Benard K.
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Martynas Mickevičius
Typesafe <http://typesafe.com/> –
Reactive<http://www.reactivemanifesto.org/>Apps on the JVM

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to