Hi William,

pasting your code so I can comment.

  // the client connects to the server & gets the 6Mb back .. or almost 6Mb :/ !

  def client(system: ActorSystem, serverAddress: InetSocketAddress): Unit = {
    implicit val sys = system
    implicit val ec = system.dispatcher
    val settings = MaterializerSettings()
    val materializer = FlowMaterializer(settings)
    implicit val timeout = Timeout(5.seconds)

    val clientFuture = (IO(StreamTcp) ? StreamTcp.Connect(settings,
serverAddress))
    clientFuture.onSuccess {
      case clientBinding: StreamTcp.OutgoingTcpConnection =>

        var count = 0
        val decodingBuffer = new ArrayBuffer[Byte]()

        Flow(clientBinding.inputStream)
          .map(bytes => {

            count += bytes.size // THIS IS NOT SAFE: you are mutating
a closed-over non-volatile var and Akka Streams are concurrent

            // if you comment the next 3 lines, you'll get the 6Mb. If
not, some data is lost
            val array = new Array[Byte](bytes.size)
            decodingBuffer ++= array // THIS IS NOT SAFE: you are
mutating a closed-over non-threadsafe data structure from within a
concurrent application
            val bytebuffer = ByteBuffer.wrap(decodingBuffer.toArray)
// What is this line intending to achieve

          })
          .onComplete(materializer) {
          case Success(_) => {
            println(f"we received $count bytes") // THIS IS NOT SAFE:
you are reading a closed-over non-volatile var and Akka Streams are
concurrent
          }

          case Failure(e) =>
        }

    }

    clientFuture.onFailure {
      case e: Throwable =>
        println(s"Client could not connect to $serverAddress: ${e.getMessage}")
        system.shutdown()
    }

  }

}




Does this code work better? (Warning: I didn't compile it but you'll
get the idea)


  def client(system: ActorSystem, serverAddress: InetSocketAddress): Unit = {
    implicit val sys = system
    import sys.dispatcher
    val materializer = FlowMaterializer(MaterializerSettings())
    implicit val timeout = Timeout(5.seconds)

    (IO(StreamTcp) ? StreamTcp.Connect(settings, serverAddress)) onComplete {
      case Success(clientBinding: StreamTcp.OutgoingTcpConnection) =>
        Flow(clientBinding.inputStream)
          .fold(0l)( (count, bytes) => count + bytes.size )
          .toFuture(materializer) foreach { count => println(f"we
received $count bytes") }
        }
      case Failure(e) =>
        println(s"Client could not connect to $serverAddress: ${e.getMessage}")
        system.shutdown()
    }

  }




On Wed, Aug 13, 2014 at 2:18 AM, William Le Ferrand <[email protected]>
wrote:

> Dear List,
>
> I've facing a puzzling issue with (a slightly modified version of) the
> example code TcpEcho.scala. I'm sending 6mb with the server to the client:
>
>  - when I only count the bytes received, I get the right byte count back
> (6M)
>  - when I do something as simple as allocating an array inside the
> operation applied to each bytestring, I start loosing bytes
>
> I reduced the issue to a minimal code example:
> https://gist.github.com/williamleferrand/77b4e40787b593d1f00d . If you
> run it, you'll see that you don't get exactly 6M bytes back.
>
> Does anyone would have a clue of why I'm loosing these bytes?
>
> Thanks in advance,
>
> Best,
>
> William
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Cheers,
√

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to