Hi Federico, Could you please provide me access to your blog? I have a similar requirement to consume messages from the event stream and respond to the client (browser) over a WebSocket connection. I am able to create a WebSocket connection and respond to the client when it asks for information.
I am wondering how to send the message to the client whenever any message pushed to Akka event stream, without the client specifically asking for the message. The server should be able to push the message to the client over WebSocket as soon as a message is published to the event stream. Also, how do I club both the request/response cycle (i,e responding to a client when a client asks for data) and then sending the data to the client when there is anything published to event stream. Is it possible to club both of these in one Route? Please help me with this. Also if you could provide me access to your blog that will be great. I referred below links but didn't get an idea. https://doc.akka.io/docs/akka-http/current/routing-dsl/directives/websocket-directives/handleWebSocketMessagesForProtocol.html https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html#client-side-websocket-support Thanks Alok On Thursday, May 12, 2016 at 12:05:31 PM UTC-4, Federico Jakimowicz wrote: > > Hi Flavio, > Yesterday looking at this example > > http://liferepo.blogspot.com.ar/2015/01/creating-reactive-streams-components-on.html > and the code sample you published in your first post I managed to get what > you are describing here. > I mean that the publisher just publishes to the client flow and the > subscriber is just subscribed to the client flow and there is no relation > between them, so then I understood the issue you posted in the first post. > I meant i found not solution but at least i managed to understand what you > were at ( which for me was somewhat an achievement hehehe ). Then i had to > go to sleep :). > I will read the code you posted today and the solution to your issue if > you post it too! > > thanks a lot for the in detail explanation! > > Fede. > > > El jueves, 12 de mayo de 2016, 7:29:54 (UTC-3), Flavio escribió: >> >> Hello Federico >> >> >> >> Below my code. >> >> >> The class WebSocketApp is the Http App. The Method metrics returns the >> required Flow. This flow forwards all incoming messages to a newly created >> Actor of type WebSocketCommandSubscriber. The flow gets the messages to >> send back to the websocket client from newly created Actor of Type >> WebSocketDataPublisherActor. The Actor WebSocketDataPublisherActor receives >> the sending events from the Eventstream. If the websocket-stream is not >> ready to receive data (keyword backpressure) incoming messages from the >> Eventstream are lost and never send to the websocket client. For >> each connection this two Actors are created once. >> >> >> >> With this solution it is NOT possible to implement a request/response >> handling, because the WebSocketDataPublisherActor is listening on the >> Eventstream. I have solved this problem too and I will publish the code to >> my >> own question >> <https://groups.google.com/forum/#!topic/akka-user/CkbaYINPbkU> later. >> >> >> >> Have fun! >> >> Flavio >> >> >> Ps. A starting point to understand steams could be: >> https://gist.github.com/staltz/868e7e9bc2a7b8c1f754 >> >> >> >> >> >> public class WebSocketApp extends HttpApp { >> >> >> >> private static final Gson gson = new Gson(); >> >> >> >> @Override >> >> public Route createRoute() { >> >> return get( >> >> path("metrics").route(handleWebSocketMessages(metrics())) >> >> ); >> >> } >> >> >> >> private Flow<Message, Message, ?> metrics() { >> >> Sink<Message, ActorRef> metricsSink = >> Sink.actorSubscriber(WebSocketCommandSubscriber.props()); >> >> Source<Message, ActorRef> metricsSource = >> >> Source.actorPublisher(WebSocketDataPublisherActor.props()) >> >> .map((measurementData) -> >> TextMessage.create(gson.toJson(measurementData))); >> >> return Flow.fromSinkAndSource(metricsSink, metricsSource); >> >> } >> >> } >> >> >> >> >> public class WebSocketCommandSubscriber extends AbstractActorSubscriber { >> >> >> >> public static Props props() { >> >> return Props.create(WebSocketCommandSubscriber.class); >> >> } >> >> >> >> public WebSocketCommandSubscriber() { >> >> receive(ReceiveBuilder. >> >> match(ActorSubscriberMessage.OnNext.class, on -> on.element() >> instanceof Message, >> >> onNext -> { >> >> Message message = (Message)onNext.element(); >> >> handleIncomingMessage(message); >> >> >> >> // TODO: how do we handle OnComplete? (do we have to / why >> exception) >> >> // }) >> >> // .match(akka.stream.actor.ActorSubscriberMessage.OnComplete., >> (x) -> { >> >> // context().system().stop(self()); >> >> }).match(Object.class, (x) -> { >> >> System.out.println("WebSocketCommandSubscriber: Unkown >> incomming message: " + x.getClass().getName() + ": " + x); >> >> unhandled(x); >> >> }).build()); >> >> } >> >> >> >> private void handleIncomingMessage(Message message) { >> >> WebSocketMessage<?> wsCommand = new >> Gson().fromJson(message.asTextMessage().getStrictText(), >> WebSocketMessage.class); >> >> switch (wsCommand.type) { >> >> case "DoX": >> >> >> getConnectionsActor().tell(XyActor.doX(createRemoteAddress(wsCommand)), >> ActorRef.noSender()); >> >> break; >> >> case "DoY": >> >> >> getConnectionsActor().tell(XyActor.doY(createRemoteAddress(wsCommand)), >> ActorRef.noSender()); >> >> break; >> >> } >> >> } >> >> // TODO: we should refactor our messaging concept - it does not work >> that nice as expected >> >> private RemoteAddress createRemoteAddress(WebSocketMessage<?> >> wsCommand) { >> >> @SuppressWarnings("rawtypes") >> >> LinkedTreeMap map = (LinkedTreeMap) wsCommand.data; >> >> return new RemoteAddress(map.get("host").toString(), >> Double.valueOf(map.get("port").toString()).intValue(), >> map.get("actorSystemName").toString()); >> >> } >> >> private NodeName createNodeName(WebSocketMessage<?> wsCommand) { >> >> @SuppressWarnings("rawtypes") >> >> LinkedTreeMap map = (LinkedTreeMap) wsCommand.data; >> >> return new NodeName(map.get("nodeName").toString()); >> >> } >> >> >> >> private ActorSelection getConnectionsActor() { >> >> return context().system().actorSelection(Connections.ACTOR_PATH); >> >> } >> >> private ActorSelection getConnectionActor(WebSocketMessage<?> >> wsCommand) { >> >> return >> context().system().actorSelection(NodeConnection.getActorPath(createNodeName(wsCommand))); >> >> } >> >> >> >> @Override >> >> public RequestStrategy requestStrategy() { >> >> return new MaxInFlightRequestStrategy(10) { >> >> @Override >> >> public int inFlightInternally() { >> >> // we do not hold any messages yet, but will eventually be >> >> // required, e.g. for request/response message handling >> >> return 0; >> >> } >> >> }; >> >> } >> >> } >> >> >> >> >> >> public class WebSocketDataPublisher extends >> AbstractActorPublisher<WebSocketMessage<?>> { >> >> >> >> // TODO: the WebSocket client should be able to configure its >> interessted events, by sending a corresponding message >> >> List<Class<?>> interesstedEvents = Arrays.asList( >> >> MeasurementDataMessage.class, >> >> NodeConnectionEvents.class); >> >> >> >> public static Props props() { >> >> return Props.create(WebSocketDataPublisher.class); >> >> } >> >> >> >> @Override >> >> public void preStart() throws Exception { >> >> for (Class<?> eventClass : interesstedEvents) { >> >> // unsubscribing performed automatically by the event stream >> on actor destroy >> >> getContext().system().eventStream().subscribe(self(), >> eventClass); >> >> } >> >> } >> >> >> >> public WebSocketDataPublisher() { >> >> UnitPFBuilder<Object> builder = >> ReceiveBuilder.match(Cancel.class, cancel -> context().stop(self())); >> >> for (Class<?> clazz : interesstedEvents) { >> >> builder = builder.match(clazz, message -> { >> >> handleMessage(message); >> >> }); >> >> } >> >> receive(builder.build()); >> >> } >> >> >> >> private void handleMessage(Object message) { >> >> // while the stream is not ready to receive data - incoming >> messages are lost >> >> if (isActive() && totalDemand() > 0) { >> >> WebSocketMessage<?> webSocketMessage = >> WebSocketMessage.create(message.getClass().getSimpleName(), message); >> >> // System.out.println("send message to WS: " + message); >> >> onNext(webSocketMessage); >> >> } else { >> >> // System.out.println("LOST message to WS: " + message); >> >> } >> >> } >> >> } >> >> >> >> >> public class WebSocketMessage<T> { >> >> public final String type; >> >> public final T data; >> >> >> >> public static WebSocketMessage<Void> create(String type) { >> >> return create(type, null); >> >> } >> >> >> >> public static <T> WebSocketMessage<T> create(String type, T data) { >> >> return new WebSocketMessage<>(type, data); >> >> } >> >> >> >> private WebSocketMessage(String type, T data) { >> >> this.type = type; >> >> this.data = data; >> >> } >> >> } >> >> >> > -- ***************************************************************************************************** ** New discussion forum: https://discuss.akka.io/ replacing akka-user google-group soon. ** This group will soon be put into read-only mode, and replaced by discuss.akka.io ** More details: https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced ***************************************************************************************************** >>>>>>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/akka-user/96d90b2c-017f-49d6-8ccb-b308490c348b%40googlegroups.com.
