Hi,

I have been trying to use akka streams to process our Kafka based micro 
service.  One micro service would get further data from an older REST based 
service.  I have put in some Java code below (sorry not doing Scala for 
this code base, as I am working with another developer that would make 
learning a new language, architecture, etc. overwhelming).  Below is a 
simplified code that gets a message from kafka, broadcast/fanout, a path 
goes to http akka, then zip and creates a new kafka record that is sent to 
the kafka producer.  This works fine if the REST based service has a 
content-length.  Unfortunately its an older existing one, it just spits out 
data even though its not really streaming data, all http responses are 
chunked.  So the code below will just timeout when I force a Strict.  Works 
ok for REST api with content-length.

I need some help in deciding where to go:

1. Fix the old service to have a content-length.  Not ideal as its being 
used by other systems in production. There might be downstream services 
that count on the old chunked behaviour.
2. Change the flow so passes the Source<ByteString> down the stream, then 
consume when needed.  I would like to avoid this, as it would a nice to 
keep requestData to be Flow[Message, String].  So we can actually just 
plugin other flows.
3. Somehow force the StrictEntity to not timeout, I don't know how to do 
this.

Are there other options I can do?  Is Option #2 the best thing to do?  This 
way its clear than requestData is really a Source of data, and not hide it. 
 But does this mean I actually have 2 graphs instead of 1?  1 graph it get 
the message from kafka and gives a source.  The other graph is get the 
source and push messages to kafka.

Thanks, some code snippets below if the above is confusing.

Jun



final RunnableGraph<NotUsed> result =
      RunnableGraph.fromGraph(
         GraphDSL.create(
            builder -> {
               final UniformFanOutShape<Message, Message> bcast = 
builder.add(Broadcast.create(2));
               final FanInShape2<String, Message, Pair<String, Message>> zip = 
builder.add(Zip.create());

               final Outlet<ConsumerRecord<String, Optional<Message>>> source = 
builder.add(kafkaConsumer).out();
               builder.from(source).via(builder.add(extractMessage))
                     .viaFanOut(bcast);

               
builder.from(bcast).via(builder.add(requestData)).toInlet(zip.in0());
               
builder.from(bcast).via(builder.add(passMessage)).toInlet(zip.in1());

               final SinkShape<ProducerRecord<String, MessageI>> sink = 
builder.add(kafkaProducer);
               
builder.from(zip.out()).via(builder.add(toProducerRecord)).to(sink);

               return ClosedShape.getInstance();
            }));



final Flow<Message, String, NotUsed> requestDataFromRestApi = 
Flow.<Message>create().map(message -> processMessage(message))
                                                   .map(instrument -> 
HttpRequest.create("/someapi/" + instrument)).via(connectionFlow);



       final Flow<HttpRequest, String, CompletionStage<OutgoingConnection>> 
connectionFlow =
            Http.get(system).outgoingConnection(ConnectHttp.toHost("localhost", 
9007))
                  .map(response -> {

                     final Source<ByteString, Object> dataBytes = 
response.entity().getDataBytes();

                     final Sink<ByteString, CompletionStage<String>> sink = 
Sink.fold("", (body, bytes) -> {
                        return body + bytes.decodeString("UTF-8");
                     });

                     final CompletionStage<String> completionStage = 
dataBytes.runWith(sink, KafkaConnector.materializer);
                     final String body = 
completionStage.toCompletableFuture().get(5, TimeUnit.SECONDS);

                     return body;
                  });

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to