Hello Nicolas, is this the code you are running or is the the simplified example?
I am asking, because you do not need to use FlowGraph in a linear use case like this. Broadcast junction that you create and use only once is meant to split the stream into two or more legs. Another issue in this code on the publisher side is that you are grouping and mapping on iterator and not on the Flow. I think you should move these operation to the flow. The producer side with FlowGraph creation removed and combinator operations moved to the Flow then would look like: // warning, not compiled IteratorSource(byteStream).grouped(chunkSize).map(StreamOps.toByteString) .produceTo(outputStream) On Tue, Oct 7, 2014 at 11:53 PM, Nicolas Jozwiak <[email protected]> wrote: > Hello, > > I’m currently using akka stream to stream some big files and it works > quit well :) > > But in the FlowGraph stream I want to add some compression and > decompression steps. > > Compression seems working correctly, but I’ve got some offset errors > when the decompression is executed. > > To help, here is the code : > > On the Producer : > > // ByteIterator implements Iterable[Byte] and reads the file byte by > byte > > val byteStream = new ByteIterator(buffStream) > > FlowGraph { implicit builder => > > val broadcast = Broadcast[ByteString] > > val in = > IteratorSource(byteStream.grouped(chunkSize).map(StreamOps.toByteString)) > > val compress = FlowFrom[ByteString].map(StreamOps.compress) > > val out = SubscriberSink(outputStream) > > in ~> broadcast ~> compress ~> out > > }.run() > > > val toByteString = { bytes: Seq[Byte] => > > val b = new ByteStringBuilder > > b.sizeHint(bytes.length) > > b ++= bytes > > b.result() > > } > > > On the Subscriber : > > val writeToFile = ForeachSink { > > data: ByteString => > > channel.write(data.asByteBuffer) > > } > > val in = PublisherSource(publisher) > > FlowGraph { implicit builder => > > val decompress = FlowFrom[ByteString].map(StreamOps.decompress) > > val broadcast = Broadcast[ByteString] > > in ~> decompress ~> broadcast ~> writeToFile > > }.run() > > Compression, decompression are based on LZ4 Api : > > def compress(inputBuff: Array[Byte]): Array[Byte] = { > > val inputSize = inputBuff.length > > val lz4 = lz4factory.fastCompressor > > val maxOutputSize = lz4.maxCompressedLength(inputSize) > > val outputBuff = new Array[Byte](maxOutputSize + 4) > > val outputSize = lz4.compress(inputBuff, 0, inputSize, outputBuff, 4, > maxOutputSize) > > > outputBuff(0) = (inputSize & 0xff).toByte > > outputBuff(1) = (inputSize >> 8 & 0xff).toByte > > outputBuff(2) = (inputSize >> 16 & 0xff).toByte > > outputBuff(3) = (inputSize >> 24 & 0xff).toByte > > outputBuff.take(outputSize + 4) > > } > > > def decompress(inputBuff: Array[Byte]): Array[Byte] = { > > val size: Int = (inputBuff(0).asInstanceOf[Int] & 0xff) | > > (inputBuff(1).asInstanceOf[Int] & 0xff) << 8 | > > (inputBuff(2).asInstanceOf[Int] & 0xff) << 16 | > > (inputBuff(3).asInstanceOf[Int] & 0xff) << 24 > > val lz4 = lz4factory.fastDecompressor() > > val outputBuff = new Array[Byte](size) > > lz4.decompress(inputBuff, 4, outputBuff, 0, size) > > outputBuff > > } > > In my mind, the decompression process is executed with a byte array which > has not the same size than the one compress… > > Do you have any clues on that ? > > Thanks for your help. > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Martynas Mickevičius Typesafe <http://typesafe.com/> – Reactive <http://www.reactivemanifesto.org/> Apps on the JVM -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
