Main class for Subscriber: Application.java

package com.mynamespace;
import org.springframework.boot.SpringApplication;import 
org.springframework.boot.autoconfigure.SpringBootApplication;import 
org.springframework.context.ApplicationContext;import 
org.springframework.context.annotation.ComponentScan;
import akka.actor.ActorRef;import akka.actor.ActorSystem;import 
akka.actor.Props;import akka.contrib.pattern.DistributedPubSubExtension;import 
akka.contrib.pattern.DistributedPubSubMediator;
import com.mynamespace.actors.SubscriberActor;
@SpringBootApplication@ComponentScan(basePackages = "com.mynamespace.*")   
public class Application {

    public static void main(String[] args) throws InterruptedException {

        ApplicationContext ctx = SpringApplication.run(Application.class, args);
        // get hold of the actor system
        ActorSystem system = ctx.getBean(ActorSystem.class);
        ActorRef mediator = DistributedPubSubExtension.get(system).mediator();
        ActorRef subscriber = system.actorOf(
                Props.create(SubscriberActor.class), "subscriber");
       // subscribe to the topic named "content"
        mediator.tell(new DistributedPubSubMediator.Put(subscriber), 
subscriber);
        // subscriber.tell("init", null);
        System.out.println("Running.");
        Thread.sleep(5000l);
    }}

Subscriber actor: SubscriberActor.java

package com.mynamespace.actors;
import java.util.ArrayList;import java.util.List;
import akka.actor.UntypedActor;
import com.mynamespace.message.CategoryServiceRequest;import 
com.mynamespace.message.CategoryServiceResponse;
public class SubscriberActor extends UntypedActor {

    @Override
    public void onReceive(Object msg) throws Exception {
        if (msg instanceof CategoryServiceRequest) {
            System.out.println("Request received for GetCategories.");
            CategoryServiceResponse response = new CategoryServiceResponse();
            List<String> categories = new ArrayList<>();
            categories.add("Food");
            categories.add("Fruits");
            response.setCatgories(categories);
            getSender().tell(response, getSelf());
        } else if (msg instanceof String && msg.equals("init")) {
            System.out.println("init called");
        } else {
            System.out
                .println("Unhandelled message received for getCategories.");
        }
    }
}

Application.conf for subscriber

akka {
    loglevel = INFO
    stdout-loglevel = INFO
    loggers = ["akka.event.slf4j.Slf4jLogger"]
    extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }

    remote {
       enabled-transports = ["akka.remote.netty.tcp"]
       netty.tcp {
         hostname = "127.0.0.1"
         port = 0
       }
     }

     cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551",
      "akka.tcp://[email protected]:2552"]

    auto-down-unreachable-after = 10s
    }
}

Main class for publisher: Application.java

package com.mynamespace;
import org.springframework.boot.SpringApplication;import 
org.springframework.boot.autoconfigure.SpringBootApplication;import 
org.springframework.context.ApplicationContext;import 
org.springframework.context.annotation.ComponentScan;
import akka.actor.ActorRef;import akka.actor.ActorSystem;import 
akka.actor.Props;import akka.contrib.pattern.DistributedPubSubExtension;import 
akka.contrib.pattern.DistributedPubSubMediator;
import com.mynamespace.actors.PublisherActor;
@SpringBootApplication@ComponentScan(basePackages = "com.mynamespace.*")public 
class Application {

    public static void main(String[] args) throws InterruptedException {

        ApplicationContext ctx = SpringApplication.run(Application.class, args);
        // get hold of the actor system
        ActorSystem system = ctx.getBean(ActorSystem.class);
        ActorRef mediator = DistributedPubSubExtension.get(system).mediator();
        ActorRef publisher = system.actorOf(Props.create(PublisherActor.class),
            "publisher");
        mediator.tell(new DistributedPubSubMediator.Put(publisher), publisher);
        Thread.sleep(5000);
        publisher.tell("hi", publisher);
        System.out.println("Running.");
    }}

PublisherActor.java

package com.mynamespace.actors;
import scala.concurrent.Future;import akka.actor.ActorRef;import 
akka.actor.UntypedActor;import 
akka.contrib.pattern.DistributedPubSubExtension;import 
akka.contrib.pattern.DistributedPubSubMediator;import 
akka.dispatch.Mapper;import akka.pattern.Patterns;import akka.util.Timeout;
import com.mynamespace.message.CategoryServiceRequest;import 
com.mynamespace.message.CategoryServiceResponse;
public class PublisherActor extends UntypedActor {

    // activate the extension
    ActorRef mediator = DistributedPubSubExtension.get(getContext().system())
        .mediator();

    public void onReceive(Object msg) {
        if (msg instanceof String) {
            Timeout timeOut = new Timeout(50000l);
            mediator.tell(new DistributedPubSubMediator.Send(
                    "/user/subscriber", new CategoryServiceRequest()),
                    getSelf());
            Future<Object> response = Patterns.ask(mediator,
                    new DistributedPubSubMediator.Send("/user/subscriber",
                            new CategoryServiceRequest()), timeOut);
            Future<CategoryServiceResponse> finalresponse = response.map(
                    new Mapper<Object, CategoryServiceResponse>() {

                        @Override
                        public CategoryServiceResponse apply(Object parameter) {
                            CategoryServiceResponse responseFromRemote = 
(CategoryServiceResponse) parameter;
                            System.out.println("received:: list of size:: "
                                + responseFromRemote.getCatgories().size());
                            return responseFromRemote;
                        }

                    }, getContext().system().dispatcher());
        } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {
            System.out.println("subscribbed.......");

        } else {
            unhandled(msg);
        }
    }}

Application conf for publisher is same as of subscriber. Both are running 
on different ports on the same system.

I have two seed nodes defined and running on my local system. Somehow I am 
not able to ASK/TELL subscriber from producer (both running on different 
nodes) via DistributedPubSub Mediator.

After running Subscriber then publisher: I don't get any exceptions or any 
dead letter references printed in stdout/logs.

Is it possible to be able to view what actor references my mediator holds?

Need help to find issues or possible issues.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to