Hi all,
I am using Akka actors to download a file from S3. I create actors in a for
loop and pass them 'start_byte' and 'end_byte'.
Here is my code where I create the child actors:
class AWS_Path_Receiver_Actor extends Actor{
def receive = {
???
??? [some code]
???
val s3Client: AmazonS3 = new AmazonS3Client(awsCredentials)
val request: GetObjectRequest = new GetObjectRequest(bucketName,
keyName)
val s3object: S3Object = s3Client.getObject(request)
val totLength: Long = s3object.getObjectMetadata().getInstanceLength()
var actor_count: Long = 0
println("\nContent-Type: " +
s3object.getObjectMetadata().getContentType())
println("\nContent Length: " + totLength)
val step_size: Long = 1024*1024*2 // 2 MB
var start_byte: Long = 0
var end_byte: Long = step_size
var f_local_location: File = null
try{
f_local_location = ??? [some code]
???
??? [some code]
???
while(start_byte <= totLength && start_byte<=end_byte){
actor_count += 1
val download_actor = context.actorOf(Props(new Bytes_Fetcher_Actor(
awsCredentials, bucketName, keyName, f_local_location, start_byte, end_byte,
actor_count)),
name =
"BytesFetcherActor" + actor_count.toString)
download_actor ! FETCH_BYTES
start_byte = end_byte+1
end_byte = end_byte + step_size
if(end_byte >= totLength)
end_byte = totLength-1
}
???
??? [some code]
???
case COMPLETED =>
println(sender.path + " completed successfully")
try{
sender ! Kill
}
catch{
case _ =>
println("Actor killed")
}
case FAILED =>
println(sender.path + " failed")
case CANCELED =>
println("Download by " + sender.path + " was canceled")
}
}
/**
* Created by archit kapoor on 13/6/16.
*/
class Bytes_Fetcher_Actor(val awsCredentials: AWSCredentials, val bucketName:
String, val keyName: String, val f_local_location: File,
val start_byte: Long, val end_byte:
Long, val actor_count: Long) extends Actor {
def receive = {
case FETCH_BYTES =>
val actorName = self.path.name
val request: GetObjectRequest = new GetObjectRequest(bucketName,
keyName)
request.setRange(start_byte, end_byte) // This is where start_byte
and end_byte are used
val s3_file_ref = new File(f_local_location.getAbsolutePath()+"/" +
actor_count.toString+".txt")
try{
if(s3_file_ref.createNewFile()){
// Start downloading the File from AWS-S3
val txmgr: TransferManager = new TransferManager(awsCredentials)
val down: Download = txmgr.download(request, s3_file_ref)
var state = down.getState()
while(state != TransferState.Completed){
state = down.getState()
if (state == TransferState.Completed) {
println(actorName + " has completed the download")
sender ! COMPLETED
}
else if (state == TransferState.Canceled) {
sender ! CANCELED
}
else if (state == TransferState.Failed) {
sender ! FAILED
}
}
}
else{
// ERROR !!
sender() ! FAILED
}
}
catch{
case e: Throwable =>
sender ! FAILED
}
???
??? [some code]
???
}
}
So I expected the download to be multi-threaded, thought it would be fast.
After some time, it seems all the actors pause.
Instead if I download the same file in a single threaded way, it is much
faster.
The first 12 actors seem to instantly complete. It looks as if they work in
parallel, but the remaining actors seem to work in a serialized way. Why?
Are there any ways I can download a file of say 400-600 MB pretty fast
using Akka actors and AWS Java SDK?
How can this code be improved?
Does number '12' hold any significance in the context of Akka actors?
I have a 4 core processor, 16 GB RAM. JVM Heap size(min-4GB, max-6GB).
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.