Hi,
I've managed to do something which works but I think there may be thread
safety problem as I've created two graph stage logics which interfere with
each other:
class MediaDetectorSink extends
GraphStageWithMaterializedValue[SinkShape[ByteString], Future[(String,
Source[ByteString, _])]] {
val in = Inlet[ByteString]("MediaDetectorSink.in")
override val shape = SinkShape(in)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes)
= {
val promise = Promise[(String, Source[ByteString, _])]()
val logic = new GraphStageLogic(shape) with InHandler {
var detectionDone = false
var buffer = ByteString.empty
val sourceStage = new GraphStage[SourceShape[ByteString]] {
val out = Outlet[ByteString]("MediaDetectorSource.out")
override val shape = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes) = new
GraphStageLogic(shape) with OutHandler {
override def onPull() = {
if (buffer.nonEmpty) {
push(out, buffer)
buffer = ByteString.empty
} else if (canPull())
pull(in)
else
completeStage()
}
setHandler(out, this)
}
}
override def preStart() = pull(in)
override def onPush() = {
if (detectionDone)
push(sourceStage.out, grab(in))
else {
buffer ++= grab(in)
detect().fold {
pull(in)
}{ mimeType =>
promise.success((mimeType, Source.fromGraph(sourceStage)))
completeStage()
}
}
}
def detect() = {
detectionDone = true
// ...Apache Tika stuff here...
Some("my-mime-type")
}
def canPull() = !isClosed(in)
setHandler(in, this)
}
(logic, promise.future)
}
}
It's basically a sink which materialize a future containing the detected
mime type and a source.
We run a source of `ByteString` through the sink, it pull enough
`ByteString` to do its detection work, when the detection is done, it
completes the future with the detected mime type and a source which will
merge the bytes consumed from the original source (for the detection work)
and the remaining bytes of the original source.
Could you give me some advice about that? I've read the source of
`BroadcastHub` to get inspired and their use case is a bit different with
mine. They materialize a source which can then materialized many times,
whereas I materialize one source and then materialize it only one time. So
do I have to bother with thread safety here?
Thank you,
Victor
Le lundi 3 avril 2017 10:00:30 UTC+2, Victor a écrit :
>
> Hi,
>
> I've created the following service:
>
> Encrypted file
> +
> | Uploaded to
> |
> +-----v-----+
> | |
> | Amazon S3 |
> | |
> +---^---+---+ Receive resp and return
> | |
> HttpResponse(entity=resp.entity.dataBytes.via(decryptionFlow))
> | | +----------------------------+
> | +-------> +---------> Decrypted
> file with unknown mime type
> | | Decrypt & download service |
> +-----------+ <---------+
> Request encrypted file +----------------------------+ Request file
>
>
> It works like a charm, with a JVM constrained to 128Mo of memory I can
> download file of many Go without any problem (because I forward the source
> of bytes).
> But to be perfect, I would like to set the Content-Type header of the
> response containing the decrypted file, because now it's just binary data.
>
> To achieve that goal I want to use Apache Tika (it's a library used to
> detect mime type). Apache Tika need some bytes to do its work.
>
> So here is my problem, how can I peek some data from the decrypted bytes,
> detect the mime type, and when done, return the HttpResponse with the
> correct header and with the entity set with the Source of decrypted bytes?
>
> val decryptedBytesSource = resp.entity.dataBytes.via(decryptionFlow)
> val contentType = ???? detectContentType(decryptedBytesSource) ????
> HttpResponse(entity = HttpEntity(contentType, decryptedBytesSource))
>
> I know the code above doesn't work as I can't consume the source twice,
> it's just to show what I want to do.
>
> Thank you in advance :)
> Victor
>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.