Main class for Subscriber: Application.java
package com.mynamespace;
import org.springframework.boot.SpringApplication;import
org.springframework.boot.autoconfigure.SpringBootApplication;import
org.springframework.context.ApplicationContext;import
org.springframework.context.annotation.ComponentScan;
import akka.actor.ActorRef;import akka.actor.ActorSystem;import
akka.actor.Props;import akka.contrib.pattern.DistributedPubSubExtension;import
akka.contrib.pattern.DistributedPubSubMediator;
import com.mynamespace.actors.SubscriberActor;
@SpringBootApplication@ComponentScan(basePackages = "com.mynamespace.*")
public class Application {
public static void main(String[] args) throws InterruptedException {
ApplicationContext ctx = SpringApplication.run(Application.class, args);
// get hold of the actor system
ActorSystem system = ctx.getBean(ActorSystem.class);
ActorRef mediator = DistributedPubSubExtension.get(system).mediator();
ActorRef subscriber = system.actorOf(
Props.create(SubscriberActor.class), "subscriber");
// subscribe to the topic named "content"
mediator.tell(new DistributedPubSubMediator.Put(subscriber),
subscriber);
// subscriber.tell("init", null);
System.out.println("Running.");
Thread.sleep(5000l);
}}
Subscriber actor: SubscriberActor.java
package com.mynamespace.actors;
import java.util.ArrayList;import java.util.List;
import akka.actor.UntypedActor;
import com.mynamespace.message.CategoryServiceRequest;import
com.mynamespace.message.CategoryServiceResponse;
public class SubscriberActor extends UntypedActor {
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof CategoryServiceRequest) {
System.out.println("Request received for GetCategories.");
CategoryServiceResponse response = new CategoryServiceResponse();
List<String> categories = new ArrayList<>();
categories.add("Food");
categories.add("Fruits");
response.setCatgories(categories);
getSender().tell(response, getSelf());
} else if (msg instanceof String && msg.equals("init")) {
System.out.println("init called");
} else {
System.out
.println("Unhandelled message received for getCategories.");
}
}
}
Application.conf for subscriber
akka {
loglevel = INFO
stdout-loglevel = INFO
loggers = ["akka.event.slf4j.Slf4jLogger"]
extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551",
"akka.tcp://[email protected]:2552"]
auto-down-unreachable-after = 10s
}
}
Main class for publisher: Application.java
package com.mynamespace;
import org.springframework.boot.SpringApplication;import
org.springframework.boot.autoconfigure.SpringBootApplication;import
org.springframework.context.ApplicationContext;import
org.springframework.context.annotation.ComponentScan;
import akka.actor.ActorRef;import akka.actor.ActorSystem;import
akka.actor.Props;import akka.contrib.pattern.DistributedPubSubExtension;import
akka.contrib.pattern.DistributedPubSubMediator;
import com.mynamespace.actors.PublisherActor;
@SpringBootApplication@ComponentScan(basePackages = "com.mynamespace.*")public
class Application {
public static void main(String[] args) throws InterruptedException {
ApplicationContext ctx = SpringApplication.run(Application.class, args);
// get hold of the actor system
ActorSystem system = ctx.getBean(ActorSystem.class);
ActorRef mediator = DistributedPubSubExtension.get(system).mediator();
ActorRef publisher = system.actorOf(Props.create(PublisherActor.class),
"publisher");
mediator.tell(new DistributedPubSubMediator.Put(publisher), publisher);
Thread.sleep(5000);
publisher.tell("hi", publisher);
System.out.println("Running.");
}}
PublisherActor.java
package com.mynamespace.actors;
import scala.concurrent.Future;import akka.actor.ActorRef;import
akka.actor.UntypedActor;import
akka.contrib.pattern.DistributedPubSubExtension;import
akka.contrib.pattern.DistributedPubSubMediator;import
akka.dispatch.Mapper;import akka.pattern.Patterns;import akka.util.Timeout;
import com.mynamespace.message.CategoryServiceRequest;import
com.mynamespace.message.CategoryServiceResponse;
public class PublisherActor extends UntypedActor {
// activate the extension
ActorRef mediator = DistributedPubSubExtension.get(getContext().system())
.mediator();
public void onReceive(Object msg) {
if (msg instanceof String) {
Timeout timeOut = new Timeout(50000l);
mediator.tell(new DistributedPubSubMediator.Send(
"/user/subscriber", new CategoryServiceRequest()),
getSelf());
Future<Object> response = Patterns.ask(mediator,
new DistributedPubSubMediator.Send("/user/subscriber",
new CategoryServiceRequest()), timeOut);
Future<CategoryServiceResponse> finalresponse = response.map(
new Mapper<Object, CategoryServiceResponse>() {
@Override
public CategoryServiceResponse apply(Object parameter) {
CategoryServiceResponse responseFromRemote =
(CategoryServiceResponse) parameter;
System.out.println("received:: list of size:: "
+ responseFromRemote.getCatgories().size());
return responseFromRemote;
}
}, getContext().system().dispatcher());
} else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {
System.out.println("subscribbed.......");
} else {
unhandled(msg);
}
}}
Application conf for publisher is same as of subscriber. Both are running
on different ports on the same system.
I have two seed nodes defined and running on my local system. Somehow I am
not able to ASK/TELL subscriber from producer (both running on different
nodes) via DistributedPubSub Mediator.
After running Subscriber then publisher: I don't get any exceptions or any
dead letter references printed in stdout/logs.
Is it possible to be able to view what actor references my mediator holds?
Need help to find issues or possible issues.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.