Hello,

   I’m currently using akka stream to stream some big files and it works 
quit well :) 

   But in the FlowGraph stream I want to add some compression and 
decompression steps.

   Compression seems working correctly, but I’ve got some offset errors 
when the decompression is executed. 

To help, here is the code :

On the Producer :

 // ByteIterator implements Iterable[Byte] and reads the file byte by byte

 val byteStream = new ByteIterator(buffStream)

 FlowGraph { implicit builder =>

    val broadcast = Broadcast[ByteString]

    val in = 
IteratorSource(byteStream.grouped(chunkSize).map(StreamOps.toByteString))

    val compress = FlowFrom[ByteString].map(StreamOps.compress)

    val out = SubscriberSink(outputStream)

    in ~> broadcast ~> compress ~> out

  }.run()


  val toByteString = { bytes: Seq[Byte] =>

    val b = new ByteStringBuilder

    b.sizeHint(bytes.length)

    b ++= bytes

    b.result()

  }


On the Subscriber :

 val writeToFile = ForeachSink {

    data: ByteString =>

      channel.write(data.asByteBuffer)

  }

  val in = PublisherSource(publisher)

  FlowGraph { implicit builder =>

    val decompress = FlowFrom[ByteString].map(StreamOps.decompress)

    val broadcast = Broadcast[ByteString]

    in ~> decompress ~> broadcast ~> writeToFile

  }.run()

Compression, decompression are based on LZ4 Api :

def compress(inputBuff: Array[Byte]): Array[Byte] = {

    val inputSize = inputBuff.length

    val lz4 = lz4factory.fastCompressor

    val maxOutputSize = lz4.maxCompressedLength(inputSize)

    val outputBuff = new Array[Byte](maxOutputSize + 4)

    val outputSize = lz4.compress(inputBuff, 0, inputSize, outputBuff, 4, 
maxOutputSize)


    outputBuff(0) = (inputSize & 0xff).toByte

    outputBuff(1) = (inputSize >> 8 & 0xff).toByte

    outputBuff(2) = (inputSize >> 16 & 0xff).toByte

    outputBuff(3) = (inputSize >> 24 & 0xff).toByte

    outputBuff.take(outputSize + 4)

  }


  def decompress(inputBuff: Array[Byte]): Array[Byte] = {

    val size: Int = (inputBuff(0).asInstanceOf[Int] & 0xff) |

      (inputBuff(1).asInstanceOf[Int] & 0xff) << 8 |

      (inputBuff(2).asInstanceOf[Int] & 0xff) << 16 |

      (inputBuff(3).asInstanceOf[Int] & 0xff) << 24

    val lz4 = lz4factory.fastDecompressor()

    val outputBuff = new Array[Byte](size)

    lz4.decompress(inputBuff, 4, outputBuff, 0, size)

    outputBuff

  }

In my mind, the decompression process is executed with a byte array which 
has not the same size than the one compress…

Do you have any clues on that ?

Thanks for your help. 

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to