Materialization is not async, so anything you do in mapMaterializedValue
will block the materialization. Simplest solution is to not run the logic
that feeds the OutputStream inside the materialization, but instead do that
by keeping the materialized value through your chain of stages, and getting
it as the returned value from run(). In some cases, maybe yours, you cannot
do that because you are passing the blueprint to some API that will do the
actual materialization, in that case you can instead fork off a
future/thread to do the writing, to let the materialization complete.
Something like this:
def legacySource =
StreamConverters.asOutputStream()
.mapMaterializedValue { outputstream =>
Future {
while(something) {
outputstream.write(data)
}
}(use-a-blocking-specific-dispatcher-here-as-write-is-blocking)
NotUsed
}
val route = get {
complete {
HttpEntity(
ContentTypes.`application/octet-stream`,
legacySource
)
}
}
--
Johan
Akka Team
On Wednesday, October 25, 2017 at 1:42:58 PM UTC+2, Rafał Krzewski wrote:
>
> Hi,
>
> I've tried using `StreamConverters.asOutputStream` and immediately run
> into the deadlock described in the documentation [1]
>
> My use case is that I am am creating Excel spreadsheet using Apache POI's
> streaming API [2] and then stream out the result using Akka HTTP.
>
> Under the hood POI is writing the row data into a temporary files on disk.
> When all data is ready, one can write out the result to an arbitrary
> `OutputStream` [3].
>
> When interoperating with Akka HTTP I need to provide a `Source` that is
> materialized by the framework. That's why my intuition was using
> `mapMaterializedValue` to provide the code making use of the `OutputStream`
> at the site when I'm creating the source. Unfortunately this does not work.
> I was able to work around that by writing out the spreadsheet to another
> temporary file, providing a `Source` to Akka HTTP side using `FileIO` and
> some additional song and dance to clean up the temporary files in all
> circumstances.
>
>
> I am wondering if there is a way I could use
> `StreamConverters.asOutputStream` correctly in this scenario that I don't
> see? Or maybe another kind of API would be necessary here? I'm thinking
> about something along the lines of:
>
>
> `asOutputStream(f: OutputStream => Done, writeTimeout: FiniteDuration =
> 5.seconds): Source[ByteString, Future[IOResult]]`
>
>
> `f` would be invoked after the stream is ready for writing. After `f`
> completes, the framework could ensure the stream is cleaned up properly.
> The returned `IOResult.status` could be used to check whether `f` completed
> normally. If `f` fails to complete within specified timeout, any further
> attempt to call methods on the `OutputStream` should result in an
> `IOException`. The problem I see is that `f` could get permanently blocked
> on some condition and thus steal a thread from
> `akka.stream.blocking-io-dispatcher` but I don't think there is any way to
> handled that on the JVM.
>
>
> I am not sure if the above is feasible but if it were I'm sure people
> would find it useful for interfacing with legacy code ;)
>
>
> Cheers,
>
> Rafał
>
>
> [1]
> https://doc.akka.io/docs/akka/current/scala/stream/stages-overview.html#additional-sink-and-source-converters
>
> [2] https://poi.apache.org/spreadsheet/how-to.html#sxssf
>
> [3]
> https://poi.apache.org/apidocs/org/apache/poi/xssf/streaming/SXSSFWorkbook.html#write-java.io.OutputStream-
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.