Hello,
I’m currently using akka stream to stream some big files and it works
quit well :)
But in the FlowGraph stream I want to add some compression and
decompression steps.
Compression seems working correctly, but I’ve got some offset errors
when the decompression is executed.
To help, here is the code :
On the Producer :
// ByteIterator implements Iterable[Byte] and reads the file byte by byte
val byteStream = new ByteIterator(buffStream)
FlowGraph { implicit builder =>
val broadcast = Broadcast[ByteString]
val in =
IteratorSource(byteStream.grouped(chunkSize).map(StreamOps.toByteString))
val compress = FlowFrom[ByteString].map(StreamOps.compress)
val out = SubscriberSink(outputStream)
in ~> broadcast ~> compress ~> out
}.run()
val toByteString = { bytes: Seq[Byte] =>
val b = new ByteStringBuilder
b.sizeHint(bytes.length)
b ++= bytes
b.result()
}
On the Subscriber :
val writeToFile = ForeachSink {
data: ByteString =>
channel.write(data.asByteBuffer)
}
val in = PublisherSource(publisher)
FlowGraph { implicit builder =>
val decompress = FlowFrom[ByteString].map(StreamOps.decompress)
val broadcast = Broadcast[ByteString]
in ~> decompress ~> broadcast ~> writeToFile
}.run()
Compression, decompression are based on LZ4 Api :
def compress(inputBuff: Array[Byte]): Array[Byte] = {
val inputSize = inputBuff.length
val lz4 = lz4factory.fastCompressor
val maxOutputSize = lz4.maxCompressedLength(inputSize)
val outputBuff = new Array[Byte](maxOutputSize + 4)
val outputSize = lz4.compress(inputBuff, 0, inputSize, outputBuff, 4,
maxOutputSize)
outputBuff(0) = (inputSize & 0xff).toByte
outputBuff(1) = (inputSize >> 8 & 0xff).toByte
outputBuff(2) = (inputSize >> 16 & 0xff).toByte
outputBuff(3) = (inputSize >> 24 & 0xff).toByte
outputBuff.take(outputSize + 4)
}
def decompress(inputBuff: Array[Byte]): Array[Byte] = {
val size: Int = (inputBuff(0).asInstanceOf[Int] & 0xff) |
(inputBuff(1).asInstanceOf[Int] & 0xff) << 8 |
(inputBuff(2).asInstanceOf[Int] & 0xff) << 16 |
(inputBuff(3).asInstanceOf[Int] & 0xff) << 24
val lz4 = lz4factory.fastDecompressor()
val outputBuff = new Array[Byte](size)
lz4.decompress(inputBuff, 4, outputBuff, 0, size)
outputBuff
}
In my mind, the decompression process is executed with a byte array which
has not the same size than the one compress…
Do you have any clues on that ?
Thanks for your help.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.