Hi Xavier,

seems like a potential bug related to the TcpStream and not the chunker:

This works for me:

  def testNoNetwork(fileLength: Int, chunkSize: Int) {
    implicit val actorSystem = ActorSystem("test",
ConfigFactory.parseString( """akka {

    |  log-dead-letters = 0

    |  log-dead-letters-during-shutdown = off

    |}""".stripMargin))
    val settings = MaterializerSettings()
    val materializer = FlowMaterializer(settings)

    def bytes = Iterator.fill(fileLength)(1.toByte)

    val testComplete = Flow(bytes).grouped(chunkSize).map(
        bytesIt => {
          val b = new ByteStringBuilder()
          b.sizeHint(chunkSize)
          b ++= bytesIt
          b.result()
        }
      ).fold(0)({ _ + _.length }).map({
      case completed @ `fileLength` =>
          s"---- transfer OK: $fileLength/$completed, chunkSize: $chunkSize"
      case completed =>
        throw new RuntimeException(s"---- transfer KO. expected:
$fileLength got: $completed diff: ${fileLength - completed}")
      }).toFuture(materializer).andThen({ case _ =>
        actorSystem.shutdown()
        actorSystem.awaitTermination(3.seconds)
      })(ExecutionContext.global)

    println(Await.result(testComplete, atMost = 30.second))
  }


scala> testNoNetwork(10 * 1024 * 1024, 3 * 1024 * 1024)
---- transfer OK: 10485760/10485760, chunkSize: 3145728


On Wed, Aug 20, 2014 at 3:49 PM, Xavier Bucchiotty <[email protected]>
wrote:

> Hello Viktor,
>
> Thanks a lot for your answer. I've tried your solution and passed from
> version 0.3 to 0.5 of akka-stream modules. But problem is still there. I've
> extracted part of the code into a Gist.
> https://gist.github.com/xbucchiotty/3fab493607588dec0ac8
>
> Streaming a file of 10.MB with a chunkSize of 3.MB fails. Some chunks
> didn't arrived to the receiver once the flow on the receiver side is
> completed. The greater the chunkSize is, the more the delta is important.
> 3.MB is huge for a chunkSize production. It was detected with Scalacheck
> tests.
>
> To launch the test, I use sbt + consoleQuick. The very first time I launch
> the test, it fails. But in the same Scala REPL session, if I try again, it
> passes !!!!
>
>
> Another point that may be important (can't see why yet). Senders and
> receivers are actors in our code. On the flow of connection, we make some
> stuff as:
> foreach{conn => self ! conn}
>
> Problems come with even smaller files within this contexts of actors.
>
>
> Any clue ?
>
>
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to