I worked towards option 2.  Making a nested Source[Source[_]] (I miss using 
scala).  This works, but I get a deadletter which I am unsure what is 
causing it.  Would appreciate to get some feedback if I am heading to 
correct path, or am I going to have a painful path and might as well shoot 
my foot.

final Source<Source<Message, Object>, NotUsed> result =
      Source.fromGraph(
         GraphDSL.create( builder -> {
                  final UniformFanOutShape<Message, Message> bcast = 
builder.add(Broadcast.create(2));
                  final FanInShape2<Source<ByteString, Object>, Message, 
Pair<Source<ByteString, Object>, Message>> zip = builder.add(Zip.create());

                  final Outlet<ConsumerRecord<String, Optional<Message>>> 
source = builder.add(kafkaConsumer).out();
                  builder.from(source).via(builder.add(extractMessage))
                        .viaFanOut(bcast);

                  
builder.from(bcast).via(builder.add(requestDataFromRestApi)).toInlet(zip.in0());
                  
builder.from(bcast).via(builder.add(passMessage)).toInlet(zip.in1());

                  final FlowShape<Pair<Source<ByteString, Object>, Message>, 
Source<Message, Object>> shape = builder.add(toProducerRecord);
                  builder.from(zip.out()).via(shape);

                  return SourceShape.of(shape.out());
            }));


result.runForeach( source -> {
   source.map( message -> {
         ProducerRecord<String, Message> record = new 
ProducerRecord<>(KafkaConnector.topic, message.correlationId(), message);
         logger.debug("sending message" + record.value().toString());
         return record;
      }

   
).to(Producer.plainSink(KafkaConnector.producerSettings)).run(KafkaConnector.materializer);
}, KafkaConnector.materializer);




On Wednesday, July 27, 2016 at 3:38:16 PM UTC+12, Jun wrote:
>
> Hi,
>
> I have been trying to use akka streams to process our Kafka based micro 
> service.  One micro service would get further data from an older REST based 
> service.  I have put in some Java code below (sorry not doing Scala for 
> this code base, as I am working with another developer that would make 
> learning a new language, architecture, etc. overwhelming).  Below is a 
> simplified code that gets a message from kafka, broadcast/fanout, a path 
> goes to http akka, then zip and creates a new kafka record that is sent to 
> the kafka producer.  This works fine if the REST based service has a 
> content-length.  Unfortunately its an older existing one, it just spits out 
> data even though its not really streaming data, all http responses are 
> chunked.  So the code below will just timeout when I force a Strict.  Works 
> ok for REST api with content-length.
>
> I need some help in deciding where to go:
>
> 1. Fix the old service to have a content-length.  Not ideal as its being 
> used by other systems in production. There might be downstream services 
> that count on the old chunked behaviour.
> 2. Change the flow so passes the Source<ByteString> down the stream, then 
> consume when needed.  I would like to avoid this, as it would a nice to 
> keep requestData to be Flow[Message, String].  So we can actually just 
> plugin other flows.
> 3. Somehow force the StrictEntity to not timeout, I don't know how to do 
> this.
>
> Are there other options I can do?  Is Option #2 the best thing to do? 
>  This way its clear than requestData is really a Source of data, and not 
> hide it.  But does this mean I actually have 2 graphs instead of 1?  1 
> graph it get the message from kafka and gives a source.  The other graph is 
> get the source and push messages to kafka.
>
> Thanks, some code snippets below if the above is confusing.
>
> Jun
>
>
>
> final RunnableGraph<NotUsed> result =
>       RunnableGraph.fromGraph(
>          GraphDSL.create(
>             builder -> {
>                final UniformFanOutShape<Message, Message> bcast = 
> builder.add(Broadcast.create(2));
>                final FanInShape2<String, Message, Pair<String, Message>> zip 
> = builder.add(Zip.create());
>
>                final Outlet<ConsumerRecord<String, Optional<Message>>> source 
> = builder.add(kafkaConsumer).out();
>                builder.from(source).via(builder.add(extractMessage))
>                      .viaFanOut(bcast);
>
>                
> builder.from(bcast).via(builder.add(requestData)).toInlet(zip.in0());
>                
> builder.from(bcast).via(builder.add(passMessage)).toInlet(zip.in1());
>
>                final SinkShape<ProducerRecord<String, MessageI>> sink = 
> builder.add(kafkaProducer);
>                
> builder.from(zip.out()).via(builder.add(toProducerRecord)).to(sink);
>
>                return ClosedShape.getInstance();
>             }));
>
>
>
> final Flow<Message, String, NotUsed> requestDataFromRestApi = 
> Flow.<Message>create().map(message -> processMessage(message))
>                                                    .map(instrument -> 
> HttpRequest.create("/someapi/" + instrument)).via(connectionFlow);
>
>
>
>        final Flow<HttpRequest, String, CompletionStage<OutgoingConnection>> 
> connectionFlow =
>             
> Http.get(system).outgoingConnection(ConnectHttp.toHost("localhost", 9007))
>                   .map(response -> {
>
>                      final Source<ByteString, Object> dataBytes = 
> response.entity().getDataBytes();
>
>                      final Sink<ByteString, CompletionStage<String>> sink = 
> Sink.fold("", (body, bytes) -> {
>                         return body + bytes.decodeString("UTF-8");
>                      });
>
>                      final CompletionStage<String> completionStage = 
> dataBytes.runWith(sink, KafkaConnector.materializer);
>                      final String body = 
> completionStage.toCompletableFuture().get(5, TimeUnit.SECONDS);
>
>                      return body;
>                   });
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to