Hi and thanks for your attention.
Now I am using akka-stream 1.0 and scala 2.11.7 with the same results as
previously.
New trial : on server side, I replace "iterator" with
"SynchronousFileSource" and I have an exception on client side :
"IllegalResponseException: Response Content-Length 731408384 exceeds
the configured limit of 8388608"
Exception is described at the the end of the mail.
Question : with SynchronousFileSource (and the existing code at server
side), may I avoid the exception ?
val f = new File(path)
val responseEntity = HttpEntity(
MediaTypes.`application/octet-stream`,
f.length,
SynchronousFileSource(f, chunkSize = 262144))
HttpResponse(entity = responseEntity)
The full server side code is described at the the end of the mail.
Thanks for any help.
Exception at client side
==================
akka.http.scaladsl.model.IllegalResponseException: Response
Content-Length 731408384 exceeds the configured limit of 8388608
at
akka.http.impl.engine.client.OutgoingConnectionBlueprint$$anonfun$2.applyOrElse(OutgoingConnectionBlueprint.scala:76)
at
akka.http.impl.engine.client.OutgoingConnectionBlueprint$$anonfun$2.applyOrElse(OutgoingConnectionBlueprint.scala:73)
at akka.stream.impl.fusing.Collect.onPush(Ops.scala:83)
at akka.stream.impl.fusing.Collect.onPush(Ops.scala:78)
at
akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.run(Interpreter.scala:436)
at
akka.stream.impl.fusing.OneBoundedInterpreter$State$class.progress(Interpreter.scala:245)
at
akka.stream.impl.fusing.OneBoundedInterpreter$$anon$1.progress(Interpreter.scala:434)
at
akka.stream.impl.fusing.OneBoundedInterpreter.akka$stream$impl$fusing$OneBoundedInterpreter$$execute(Interpreter.scala:580)
at
akka.stream.impl.fusing.OneBoundedInterpreter$State$class.execute(Interpreter.scala:241)
at
akka.stream.impl.fusing.OneBoundedInterpreter$EntryState.execute(Interpreter.scala:666)
at akka.stream.stage.AbstractStage.enterAndPush(Stage.scala:66)
at
akka.stream.impl.fusing.BatchingActorInputBoundary$$anonfun$upstreamRunning$1.applyOrElse(ActorInterpreter.scala:157)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at akka.stream.impl.SubReceive.apply(Transfer.scala:16)
at akka.stream.impl.SubReceive.apply(Transfer.scala:12)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
...
Server.scala
===========
package app
import java.io.File
import java.io.FileInputStream
import java.nio.channels.FileChannel
import java.nio.file.{Path, Paths, StandardOpenOption}
import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.util.Failure
import scala.util.Success
import akka.actor.ActorSystem
import java.nio.channels.FileChannel
import java.nio.file.{Path, Paths, StandardOpenOption}
import java.nio.{ByteBuffer, MappedByteBuffer}
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpEntity.ChunkStreamPart
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.io.SynchronousFileSource
import akka.util.{ByteString, Timeout}
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try
import scala.util.control.NonFatal
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
class ByteBufferIterator(buffer:ByteBuffer, chunkSize:Int) extends
Iterator[ByteString] {
require(buffer.isReadOnly)
require(chunkSize > 0)
override def hasNext = buffer.hasRemaining
override def next(): ByteString = {
val size = chunkSize min buffer.remaining()
val temp = buffer.slice()
temp.limit(size)
buffer.position(buffer.position() + size)
ByteString(temp)
}
}
object Server extends App {
def map1(path: String) : MappedByteBuffer = {
val inputStream = new FileInputStream(path)
val channel: FileChannel = inputStream.getChannel();
val result = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size);
channel.close()
result
}
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val askTimeout: Timeout = 500.millis
import HttpMethods._
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, uri, headers, _, _) =>
val path = "c:/tmp/movie.avi"
println("=========== path=" + path)
val result = Try {
// //---- Solution 1 with iterator
// // Set chunks
// val mappedByteBuffer = map1(path)
// val iterator = new ByteBufferIterator(mappedByteBuffer, 262144)
//4096)
// var cnt = 0
// val chunks = Source(() => iterator).map { x =>
// if (cnt % 10000 == 0)
// this.koaLogger.info("Chunk of size " + x.size + " cnt=" +
cnt)
// cnt += 1
// HttpEntity.ChunkStreamPart(x)
// }
// // Set response
// val responseEntity =
HttpEntity.Chunked(MediaTypes.`application/octet-stream`, chunks)
// HttpResponse(entity = responseEntity)
//---- Solution 2 with SynchronousFileSource
val f = new File(path)
val responseEntity = HttpEntity(
MediaTypes.`application/octet-stream`,
f.length,
SynchronousFileSource(f, chunkSize = 262144))
HttpResponse(entity = responseEntity)
} recover {
case NonFatal(cause) =>
HttpResponse(StatusCodes.InternalServerError, entity = cause.getMessage)
}
result.get
case _: HttpRequest =>
HttpResponse(StatusCodes.NotFound, entity = "Unknown resource!")
}
val serverSource: Source[Http.IncomingConnection,
Future[Http.ServerBinding]] = Http(system).bind(interface = "localhost",
port = 8080)
val bindingFuture: Future[Http.ServerBinding] =
serverSource.to(Sink.foreach { connection =>
// foreach materializes the source
println("Accepted new connection from " + connection.remoteAddress)
// ... and then actually handle the connection
connection.handleWithSyncHandler(requestHandler)
}).run()
System.in.read()
system.shutdown()
system.awaitTermination()
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.