Hi Nicolas, this means that you are relying on “message boundaries”, which I put in quotes because TCP does not support this concept: you will receive bytes in arbitrary chunks, not in the chunks that you sent. Coupled with the lz4 state issue that Endre pointed out this should explain what is going on.
So, in summary: you’ll need to use the same lz4 instance throughout the full compression, and you’ll need to also use one lz4 instance throughout the whole decompression. If the lz4 implementation actually works then this will do it. Regards, Roland 8 okt 2014 kl. 17:41 skrev Nicolas Jozwiak <[email protected]>: > One thing I didn't tell is on my local computer it works perfectly. > > Problem arises when I test with network. How does akka stream manage TCP > fragments ? > > Le mercredi 8 octobre 2014 16:13:24 UTC+2, drewhk a écrit : > Nicolas, I am not sure the code you wrote does what you want. > > First of all reading something in byte-by-byte is highly inefficient, you > should have your source as a stream of blocks, for example > Source[ByteString]. The .grouped() will not help at all, because now you will > get Seq[Byte] which means a huge amount of allocations. > > Using a compressor stage simply as a map seems suspicious, too. Are you > certain that your compress or decompress function gives back something all > the time it is called? I.e are there cases when the compressor/decompressor > cannot provide any outputs because it needs more inputs still? > > Also, isn't this lz4 below stateful? > > val lz4 = lz4factory.fastDecompressor() > > val outputBuff = new Array[Byte](size) > > lz4.decompress(inputBuff, 4, outputBuff, 0, size) > > > > If it is stateful then you shouln't create a new instance every time. If it > is not stateful, then I don't know what this library does -- does it need the > whole file to be in memory to decompress? > > > > I don't know the library you used, but shouldn't this line > > > > lz4.decompress(inputBuff, 4, outputBuff, 0, size) > > > > return the count of the actually written bytes? Looking at this > (https://github.com/jpountz/lz4-java/blob/master/src/java/net/jpountz/lz4/LZ4JNIFastDecompressor.java) > it does, which means you cannot just return the whole Array. > > > > If you can package up a small failing sample (not just snippets) we can see > then if this is a bug or not. > > > > -Endre > > > On Wed, Oct 8, 2014 at 3:57 PM, Nicolas Jozwiak <[email protected]> wrote: > Yes we close the outputstream at the end. > > I've continued to do some tests with small chunk size and it passes only one > time... When I retry I have the above error => (Expected file length: > '114541', actual: '95541') or a Java OutOfMemoryError... > > Nicolas. > > Le mercredi 8 octobre 2014 15:48:42 UTC+2, drewhk a écrit : > Do you properly close/flush the outputstream? > > -Endre > > On Wed, Oct 8, 2014 at 3:45 PM, Nicolas Jozwiak <[email protected]> wrote: > I've just tried with a smaller chunk size (1000), and now I have a different > file size at the end => (Expected file length: '114541', actual: '95541') > Weird... > > Nicolas > > Le mercredi 8 octobre 2014 11:37:42 UTC+2, Martynas Mickevičius a écrit : > Have you tried it with smaller chunk sized? > > On Wed, Oct 8, 2014 at 12:34 PM, Nicolas Jozwiak <[email protected]> wrote: > The compress / decompress implementation works well (tested with a simple > example without stream). > > The ByteString I receive are not the same size from the compression. For > example a chunk size of 20000, it will be 19065 after compression. From the > decompression I will receive 19070 to decompress... > > Here the stack error : > > net.jpountz.lz4.LZ4Exception: Error decoding offset 10 of input buffer > at > net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:33) > ~[lz4-1.2.0.jar:na] > > Nicolas. > > Le mercredi 8 octobre 2014 11:14:53 UTC+2, Martynas Mickevičius a écrit : > Okay. That makes sense then. > > What kind of errors are you getting? Are the ByteStrings received in a > different order? > > Have you tested compress/decompress method implementation? > > On Wed, Oct 8, 2014 at 12:08 PM, Nicolas Jozwiak <[email protected]> wrote: > Hi Martynas, > > Yes it's a simplified example, because we use another branch to do some > some digester process. > Here the complete code : > > FlowGraph { implicit builder => > val broadcast = Broadcast[ByteString] > > // streamDigester is an extends to Subscriber and update a > MessageDigest object > val digester = SubscriberSink(streamDigester) > > val in = > IteratorSource(byteStream.grouped(chunkSize).map(StreamOps.toByteString)) > > val compress = FlowFrom[ByteString].map(StreamOps.compress) > val out = SubscriberSink(outputStream) > > in ~> broadcast ~> compress ~> out > broadcast ~> digester > > }.run() > > Nicolas. > Le mercredi 8 octobre 2014 10:34:17 UTC+2, Martynas Mickevičius a écrit : > Hello Nicolas, > > is this the code you are running or is the the simplified example? > > I am asking, because you do not need to use FlowGraph in a linear use case > like this. Broadcast junction that you create and use only once is meant to > split the stream into two or more legs. > > Another issue in this code on the publisher side is that you are grouping and > mapping on iterator and not on the Flow. I think you should move these > operation to the flow. > > The producer side with FlowGraph creation removed and combinator operations > moved to the Flow then would look like: > > // warning, not compiled > IteratorSource(byteStream).grouped(chunkSize).map(StreamOps.toByteString).produceTo(outputStream) > > On Tue, Oct 7, 2014 at 11:53 PM, Nicolas Jozwiak <[email protected]> wrote: > Hello, > > I’m currently using akka stream to stream some big files and it works quit > well :) > > But in the FlowGraph stream I want to add some compression and > decompression steps. > > Compression seems working correctly, but I’ve got some offset errors when > the decompression is executed. > > To help, here is the code : > > On the Producer : > > > // ByteIterator implements Iterable[Byte] and reads the file byte by byte > > val byteStream = new ByteIterator(buffStream) > > FlowGraph { implicit builder => > > val broadcast = Broadcast[ByteString] > > val in = > IteratorSource(byteStream.grouped(chunkSize).map(StreamOps.toByteString)) > > val compress = FlowFrom[ByteString].map(StreamOps.compress) > > val out = SubscriberSink(outputStream) > > in ~> broadcast ~> compress ~> out > > }.run() > > > > val toByteString = { bytes: Seq[Byte] => > > val b = new ByteStringBuilder > > b.sizeHint(bytes.length) > > b ++= bytes > > b.result() > > } > > > > On the Subscriber : > > val writeToFile = ForeachSink { > > data: ByteString => > > channel.write(data.asByteBuffer) > > } > > val in = PublisherSource(publisher) > > FlowGraph { implicit builder => > > val decompress = FlowFrom[ByteString].map(StreamOps.decompress) > > val broadcast = Broadcast[ByteString] > > in ~> decompress ~> broadcast ~> writeToFile > > }.run() > > Compression, decompression are based on LZ4 Api : > > def compress(inputBuff: Array[Byte]): Array[Byte] = { > > val inputSize = inputBuff.length > > val lz4 = lz4factory.fastCompressor > > val maxOutputSize = lz4.maxCompressedLength(inputSize) > > val outputBuff = new Array[Byte](maxOutputSize + 4) > > val outputSize = lz4.compress(inputBuff, 0, inputSize, outputBuff, 4, > maxOutputSize) > > > > outputBuff(0) = (inputSize & 0xff).toByte > > outputBuff(1) = (inputSize >> 8 & 0xff).toByte > > outputBuff(2) = (inputSize >> 16 & 0xff).toByte > > outputBuff(3) = (inputSize >> 24 & 0xff).toByte > > outputBuff.take(outputSize + 4) > > } > > > > def decompress(inputBuff: Array[Byte]): Array[Byte] = { > > val size: Int = (inputBuff(0).asInstanceOf[Int] & 0xff) | > > (inputBuff(1).asInstanceOf[Int] & 0xff) << 8 | > > (inputBuff(2).asInstanceOf[Int] & 0xff) << 16 | > > (inputBuff(3).asInstanceOf[Int] & 0xff) << 24 > > val lz4 = lz4factory.fastDecompressor() > > val outputBuff = new Array[Byte](size) > > lz4.decompress(inputBuff, 4, outputBuff, 0, size) > > outputBuff > > } > > In my mind, the decompression process is executed with a byte array which has > not the same size than the one compress… > > Do you have any clues on that ? > > Thanks for your help. > > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > > > > -- > Martynas Mickevičius > Typesafe – Reactive Apps on the JVM > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > > > > -- > Martynas Mickevičius > Typesafe – Reactive Apps on the JVM > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > > > > -- > Martynas Mickevičius > Typesafe – Reactive Apps on the JVM > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. Dr. Roland Kuhn Akka Tech Lead Typesafe – Reactive apps on the JVM. twitter: @rolandkuhn -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
