Hi and thanks for your attention.
Now I am using akka-stream 1.0 and scala 2.11.7 with the same results as 
previously.

New trial : on server side, I replace  "iterator" with 
"SynchronousFileSource" and I have an exception on client side :
     "IllegalResponseException: Response Content-Length 731408384 exceeds 
the configured limit of 8388608"
Exception is described at the the end of the mail.

Question : with SynchronousFileSource (and the existing code at server 
side), may I avoid the exception ?

    val f = new File(path)
    val responseEntity = HttpEntity(
        MediaTypes.`application/octet-stream`,
        f.length,
        SynchronousFileSource(f, chunkSize = 262144))
    HttpResponse(entity = responseEntity)


The full server side code is described at the the end of the mail.

Thanks for any help.


Exception at client side
==================
  akka.http.scaladsl.model.IllegalResponseException: Response 
Content-Length 731408384 exceeds the configured limit of 8388608
   at 
akka.http.impl.engine.client.OutgoingConnectionBlueprint$$anonfun$2.applyOrElse(OutgoingConnectionBlueprint.scala:76)
   at 
akka.http.impl.engine.client.OutgoingConnectionBlueprint$$anonfun$2.applyOrElse(OutgoingConnectionBlueprint.scala:73)
   at akka.stream.impl.fusing.Collect.onPush(Ops.scala:83)
   at akka.stream.impl.fusing.Collect.onPush(Ops.scala:78)
   at 
akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.run(Interpreter.scala:436)
   at 
akka.stream.impl.fusing.OneBoundedInterpreter$State$class.progress(Interpreter.scala:245)
   at 
akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.progress(Interpreter.scala:434)
   at 
akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$execute(Interpreter.scala:580)
   at 
akka.stream.impl.fusing.OneBoundedInterpreter$State$class.execute(Interpreter.scala:241)
   at 
akka.stream.impl.fusing.OneBoundedInterpreter$EntryState.execute(Interpreter.scala:666)
   at akka.stream.stage.AbstractStage.enterAndPush(Stage.scala:66)
   at 
akka.stream.impl.fusing.BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(ActorInterpreter.scala:157)
   at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
   at akka.stream.impl.SubReceive.apply(Transfer.scala:16)
   at akka.stream.impl.SubReceive.apply(Transfer.scala:12)
   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
...

Server.scala
===========
package app

import java.io.File
import java.io.FileInputStream
import java.nio.channels.FileChannel
import java.nio.file.{Path, Paths, StandardOpenOption}
import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.util.Failure
import scala.util.Success
import akka.actor.ActorSystem

import java.nio.channels.FileChannel
import java.nio.file.{Path, Paths, StandardOpenOption}
import java.nio.{ByteBuffer, MappedByteBuffer}

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpEntity.ChunkStreamPart
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.io.SynchronousFileSource
import akka.util.{ByteString, Timeout}

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try
import scala.util.control.NonFatal

import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory

class ByteBufferIterator(buffer:ByteBuffer, chunkSize:Int) extends 
Iterator[ByteString] {
require(buffer.isReadOnly)
require(chunkSize > 0)

override def hasNext = buffer.hasRemaining

override def next(): ByteString = {
val size = chunkSize min buffer.remaining()
val temp = buffer.slice()
temp.limit(size)
buffer.position(buffer.position() + size)
ByteString(temp)
}
}

object Server extends App {

def map1(path: String) : MappedByteBuffer = {
val inputStream = new FileInputStream(path)
val channel: FileChannel = inputStream.getChannel();
val result = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size);
channel.close()
result
}


implicit val system = ActorSystem()

implicit val materializer = ActorMaterializer()
implicit val askTimeout: Timeout = 500.millis

import HttpMethods._

val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, uri, headers, _, _) =>
val path = "c:/tmp/movie.avi"
println("=========== path=" + path)
val result = Try {

//              //---- Solution 1 with iterator
//             // Set chunks
//             val mappedByteBuffer = map1(path)
//             val iterator = new ByteBufferIterator(mappedByteBuffer, 262144)  
  //4096)
//             var cnt = 0
//             val chunks = Source(() => iterator).map { x =>
//                if (cnt % 10000 == 0)
//                   this.koaLogger.info("Chunk of size " + x.size + "  cnt=" + 
cnt)
//                cnt += 1
//                HttpEntity.ChunkStreamPart(x)
//             }
//             // Set response
//             val responseEntity = 
HttpEntity.Chunked(MediaTypes.`application/octet-stream`, chunks)
//             HttpResponse(entity = responseEntity)

               //---- Solution 2 with SynchronousFileSource
               val f = new File(path)
               val responseEntity = HttpEntity(
                  MediaTypes.`application/octet-stream`,
                  f.length,
                  SynchronousFileSource(f, chunkSize = 262144))
               HttpResponse(entity = responseEntity)


} recover {
case NonFatal(cause) =>
HttpResponse(StatusCodes.InternalServerError, entity = cause.getMessage)
}
result.get

case _: HttpRequest =>
HttpResponse(StatusCodes.NotFound, entity = "Unknown resource!")
}


val serverSource: Source[Http.IncomingConnection, 
Future[Http.ServerBinding]] = Http(system).bind(interface = "localhost", 
port = 8080)

val bindingFuture: Future[Http.ServerBinding] = 
serverSource.to(Sink.foreach { connection =>
// foreach materializes the source
println("Accepted new connection from " + connection.remoteAddress)
// ... and then actually handle the connection
connection.handleWithSyncHandler(requestHandler)
}).run()

System.in.read()
system.shutdown()
system.awaitTermination()
}
 

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to