Hi Petr,
Firstly - do not use the io.Source + getLines trick to get lines from a
File, it's horribly slow :-)
Instead use the SynchronousFileSource*as shown in
stream-io.html#Streaming_File_IO
<http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/stream-io.html#Streaming_File_IO>
.
It's much faster and also takes care of closing the File properly in case
of completion or failure.
You'll want to use the parseLines cookbook recipe for the time being for
parsing lines:
http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html

We already have a built-in lines parser in the works and it will be
provided in the next RC of Akka Streams:
https://github.com/akka/akka/pull/17446

Secondly, more generally speaking in cases like these you can broadcast to
2 sinks, one of them being an onComplete sink and then you
can use this sink as the "on completion do this and that" signal.


* Synchronous because it's using blocking API, because non-blocking API is
not available on Java 6 which Akka Streams have to support currently. We
have a ticket for the async file source
https://github.com/akka/akka/issues/17269 and it will be provided later on
(once Akka Streams join Akka 2.4 with requiring Java 8).

On Tue, May 26, 2015 at 4:33 PM, Petr Janda <[email protected]> wrote:

> Hi guys,
>
> I was wondering what is the best practice used in Akka Streams to clean up
> opened resources. My example use case is the stream reading lines from the
> file, streaming them to Apache Kafka (using
> https://github.com/softwaremill/reactive-kafka subscriber). See the
> example code here:
>
> val file = io.Source.fromFile(path)
> val lines = file.getLines()
> val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost =
> "localhost:2181")
> val subscriber = kafka.publish("uppercaseStrings", "groupName", new
> StringEncoder())
>
> Source(() => lines)
>   .map(_.toUpperCase)
>   .to(Sink(subscriber))
>   .run()
>
>
> As soon as the flow is done I would like to cleanup and close the file
> input stream. One way I used to go about that is to have Sink.onComplete or
> Sink.fold although this is not viable here. Also, ideally I would like to
> close the file in case of any error.
>
> Could you advice on any idiomatic way to do this?
>
> Thanks,
> ~Petr
>
>
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
Konrad 'ktoso' Malawski
Akka <http://akka.io/> @ Typesafe <http://typesafe.com/>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to