Have you tried it with smaller chunk sized?

On Wed, Oct 8, 2014 at 12:34 PM, Nicolas Jozwiak <[email protected]>
wrote:

> The compress / decompress implementation works well (tested with a simple
> example without stream).
>
> The ByteString I receive are not the same size from the compression. For
> example a chunk size of 20000, it will be 19065 after compression. From the
> decompression I will receive 19070 to decompress...
>
> Here the stack error :
>
> net.jpountz.lz4.LZ4Exception: Error decoding offset 10 of input buffer
> at
> net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:33)
> ~[lz4-1.2.0.jar:na]
>
> Nicolas.
>
> Le mercredi 8 octobre 2014 11:14:53 UTC+2, Martynas Mickevičius a écrit :
>>
>> Okay. That makes sense then.
>>
>> What kind of errors are you getting? Are the ByteStrings received in a
>> different order?
>>
>> Have you tested compress/decompress method implementation?
>>
>> On Wed, Oct 8, 2014 at 12:08 PM, Nicolas Jozwiak <[email protected]>
>> wrote:
>>
>>> Hi Martynas,
>>>
>>>    Yes it's a simplified example, because we use another branch to do
>>> some some digester process.
>>>     Here the complete code  :
>>>
>>>     FlowGraph { implicit builder =>
>>>       val broadcast = Broadcast[ByteString]
>>>
>>>       // streamDigester is an extends to Subscriber and update a
>>> MessageDigest object
>>>       val digester = SubscriberSink(streamDigester)
>>>
>>>       val in = IteratorSource(byteStream.grouped(chunkSize).map(
>>> StreamOps.toByteString))
>>>
>>>       val compress = FlowFrom[ByteString].map(StreamOps.compress)
>>>       val out = SubscriberSink(outputStream)
>>>
>>>       in ~> broadcast ~> compress ~> out
>>>       broadcast ~> digester
>>>
>>>     }.run()
>>>
>>> Nicolas.
>>> Le mercredi 8 octobre 2014 10:34:17 UTC+2, Martynas Mickevičius a écrit :
>>>>
>>>> Hello Nicolas,
>>>>
>>>> is this the code you are running or is the the simplified example?
>>>>
>>>> I am asking, because you do not need to use FlowGraph in a linear use
>>>> case like this. Broadcast junction that you create and use only once is
>>>> meant to split the stream into two or more legs.
>>>>
>>>> Another issue in this code on the publisher side is that you are
>>>> grouping and mapping on iterator and not on the Flow. I think you should
>>>> move these operation to the flow.
>>>>
>>>> The producer side with FlowGraph creation removed and combinator
>>>> operations moved to the Flow then would look like:
>>>>
>>>> // warning, not compiled
>>>> IteratorSource(byteStream).grouped(chunkSize).map(StreamOps.
>>>> toByteString).produceTo(outputStream)
>>>>
>>>> On Tue, Oct 7, 2014 at 11:53 PM, Nicolas Jozwiak <[email protected]>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>>    I’m currently using akka stream to stream some big files and it
>>>>> works quit well :)
>>>>>
>>>>>    But in the FlowGraph stream I want to add some compression and
>>>>> decompression steps.
>>>>>
>>>>>    Compression seems working correctly, but I’ve got some offset
>>>>> errors when the decompression is executed.
>>>>>
>>>>> To help, here is the code :
>>>>>
>>>>> On the Producer :
>>>>>
>>>>>   // ByteIterator implements Iterable[Byte] and reads the file byte
>>>>> by byte
>>>>>
>>>>>  val byteStream = new ByteIterator(buffStream)
>>>>>
>>>>>  FlowGraph { implicit builder =>
>>>>>
>>>>>     val broadcast = Broadcast[ByteString]
>>>>>
>>>>>     val in = IteratorSource(byteStream.grouped(chunkSize).map(
>>>>> StreamOps.toByteString))
>>>>>
>>>>>     val compress = FlowFrom[ByteString].map(StreamOps.compress)
>>>>>
>>>>>     val out = SubscriberSink(outputStream)
>>>>>
>>>>>     in ~> broadcast ~> compress ~> out
>>>>>
>>>>>   }.run()
>>>>>
>>>>>
>>>>>   val toByteString = { bytes: Seq[Byte] =>
>>>>>
>>>>>     val b = new ByteStringBuilder
>>>>>
>>>>>     b.sizeHint(bytes.length)
>>>>>
>>>>>     b ++= bytes
>>>>>
>>>>>     b.result()
>>>>>
>>>>>   }
>>>>>
>>>>>
>>>>> On the Subscriber :
>>>>>
>>>>>  val writeToFile = ForeachSink {
>>>>>
>>>>>     data: ByteString =>
>>>>>
>>>>>       channel.write(data.asByteBuffer)
>>>>>
>>>>>   }
>>>>>
>>>>>   val in = PublisherSource(publisher)
>>>>>
>>>>>   FlowGraph { implicit builder =>
>>>>>
>>>>>     val decompress = FlowFrom[ByteString].map(StreamOps.decompress)
>>>>>
>>>>>     val broadcast = Broadcast[ByteString]
>>>>>
>>>>>     in ~> decompress ~> broadcast ~> writeToFile
>>>>>
>>>>>   }.run()
>>>>>
>>>>> Compression, decompression are based on LZ4 Api :
>>>>>
>>>>> def compress(inputBuff: Array[Byte]): Array[Byte] = {
>>>>>
>>>>>     val inputSize = inputBuff.length
>>>>>
>>>>>     val lz4 = lz4factory.fastCompressor
>>>>>
>>>>>     val maxOutputSize = lz4.maxCompressedLength(inputSize)
>>>>>
>>>>>     val outputBuff = new Array[Byte](maxOutputSize + 4)
>>>>>
>>>>>     val outputSize = lz4.compress(inputBuff, 0, inputSize, outputBuff,
>>>>> 4, maxOutputSize)
>>>>>
>>>>>
>>>>>     outputBuff(0) = (inputSize & 0xff).toByte
>>>>>
>>>>>     outputBuff(1) = (inputSize >> 8 & 0xff).toByte
>>>>>
>>>>>     outputBuff(2) = (inputSize >> 16 & 0xff).toByte
>>>>>
>>>>>     outputBuff(3) = (inputSize >> 24 & 0xff).toByte
>>>>>
>>>>>     outputBuff.take(outputSize + 4)
>>>>>
>>>>>   }
>>>>>
>>>>>
>>>>>   def decompress(inputBuff: Array[Byte]): Array[Byte] = {
>>>>>
>>>>>     val size: Int = (inputBuff(0).asInstanceOf[Int] & 0xff) |
>>>>>
>>>>>       (inputBuff(1).asInstanceOf[Int] & 0xff) << 8 |
>>>>>
>>>>>       (inputBuff(2).asInstanceOf[Int] & 0xff) << 16 |
>>>>>
>>>>>       (inputBuff(3).asInstanceOf[Int] & 0xff) << 24
>>>>>
>>>>>     val lz4 = lz4factory.fastDecompressor()
>>>>>
>>>>>     val outputBuff = new Array[Byte](size)
>>>>>
>>>>>     lz4.decompress(inputBuff, 4, outputBuff, 0, size)
>>>>>
>>>>>     outputBuff
>>>>>
>>>>>   }
>>>>>
>>>>> In my mind, the decompression process is executed with a byte array
>>>>> which has not the same size than the one compress…
>>>>>
>>>>> Do you have any clues on that ?
>>>>>
>>>>> Thanks for your help.
>>>>>
>>>>> --
>>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>>>> urrent/additional/faq.html
>>>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>>>> p/akka-user
>>>>> ---
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups "Akka User List" group.
>>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>>> an email to [email protected].
>>>>> To post to this group, send email to [email protected].
>>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Martynas Mickevičius
>>>> Typesafe <http://typesafe.com/> – Reactive
>>>> <http://www.reactivemanifesto.org/> Apps on the JVM
>>>>
>>>  --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>>> current/additional/faq.html
>>> >>>>>>>>>> Search the archives: https://groups.google.com/
>>> group/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>> Martynas Mickevičius
>> Typesafe <http://typesafe.com/> – Reactive
>> <http://www.reactivemanifesto.org/> Apps on the JVM
>>
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Martynas Mickevičius
Typesafe <http://typesafe.com/> – Reactive
<http://www.reactivemanifesto.org/> Apps on the JVM

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to