I'm new Akka stream, I have to build on poc on Akka stream for instant 
messaging. So I need a help to understand what I'm missing here.


import java.net.InetSocketAddress;
import java.util.concurrent.CompletionStage;

import akka.Done;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Tcp;

/**
 * Created by gaurav on 28/7/17.
 */
public class Server {
   public static void main(String[] args) {
      InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 
6000);
      final Sink<Tcp.IncomingConnection, CompletionStage<Done>> handler = 
Sink.foreach(conn -> {
         System.out.println("Client connected from: " + conn.remoteAddress());
         conn.handleWith(ActorFlow.actorRef(out -> 
MyWebSocketActor.props(out)), InternalWebSocketHelper.actorMaterializer());
      });
      final CompletionStage<Tcp.ServerBinding> bindingFuture = 
Tcp.get(InternalWebSocketHelper.actorSystem())
            .bind(serverAddress.getHostString(), 
serverAddress.getPort()).to(handler).run(InternalWebSocketHelper.actorMaterializer());

      bindingFuture.whenComplete((binding, throwable) -> {
         System.out.println("Server started, listening on: " + 
binding.localAddress());
      });

      bindingFuture.exceptionally(e -> {
         System.err.println("Server could not bind to " + serverAddress + " : " 
+ e.getMessage());
         InternalWebSocketHelper.actorSystem().terminate();
         return null;
      });

   }

   private static class MyWebSocketActor extends UntypedActor {

      private final ActorRef out;

      public MyWebSocketActor(ActorRef out) {
         this.out = out;
      }

      public static Props props(ActorRef out) {
         return Props.create(MyWebSocketActor.class, () -> new 
MyWebSocketActor(out));
      }

      public void onReceive(Object message) throws Exception {
         out.tell(message, ActorRef.noSender());
      }
   }
}








/**
 * Created by gaurav on 1/8/17.
 */

import java.util.function.Function;

import org.reactivestreams.Publisher;

import akka.NotUsed;
import akka.actor.*;
import akka.japi.Pair;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.*;

public class ActorFlow {

   public static <In, Out> Flow<In, Out, NotUsed> actorRef(Function<ActorRef, 
Props> props) {
      return actorRef(props, 1000, OverflowStrategy.dropNew(), 
InternalWebSocketHelper.actorSystem(), 
InternalWebSocketHelper.actorMaterializer());
   }

   public static <In, Out> Flow<In, Out, NotUsed> actorRef(Function<ActorRef, 
Props> props, int bufferSize, OverflowStrategy overflowStrategy,
         ActorRefFactory factory, Materializer mat) {

      Pair<ActorRef, Publisher<Out>> pair = Source.<Out> actorRef(bufferSize, 
overflowStrategy)
            .toMat(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), 
Keep.both()).run(mat);

      return Flow.fromSinkAndSource(
            
Sink.actorRef(factory.actorOf(Props.create(WebSocketFlowActor.class, () -> new 
WebSocketFlowActor(props, pair.first()))),
                  new Status.Success(new Object())),
            Source.fromPublisher(pair.second()));
   }

   private static class WebSocketFlowActor extends UntypedActor {

      private final ActorRef flowActor;

      public WebSocketFlowActor(Function<ActorRef, Props> props, ActorRef ref) {
         flowActor = context().watch(context().actorOf(props.apply(ref), 
"flowActor"));
      }

      @Override
      public void onReceive(Object message) throws Throwable {
         if (message instanceof Status.Success) {
            flowActor.tell(PoisonPill.getInstance(), getSelf());
         } else if (message instanceof Terminated) {
            context().stop(getSelf());
         } else {
            flowActor.tell(message, getSelf());
         }
      }

      @Override
      public SupervisorStrategy supervisorStrategy() {
         return SupervisorStrategy.stoppingStrategy();
      }
   }
}






import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;

public class InternalWebSocketHelper {

   static ActorSystem actorSystem = ActorSystem.create();
   static ActorMaterializer actorMaterializer = 
ActorMaterializer.create(actorSystem);

   static ActorSystem actorSystem() {
      return actorSystem;
   }

   static ActorMaterializer actorMaterializer() {
      return actorMaterializer;
   }
}

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to