Hi,
I'm currently trying to implement some kind of supervision in my app. Some
context first : I have a controller actor dispatching files to parse when
processor actor is done with his previous file. Of course I need to
bootstrap the processing chain when application starts. My file processors
are doing JDBC calls to insert the files content through a connection
acquired from a datasource holder actor (using HikariCP with
initializationFailFast enabled). Note my processors are currently *eight*
and they are, as well as the controller, attached to a pinned dispatcher.
What I need to do is ensuring that when the DB is down (or anything bad
happens while we're at it), all messages are buffered/kept until it's up
again to resume processing.
I tried to implement something with Circuit breaker and PeekMailbox (Not
stashing because I don't see the benefit, but already tried with it and
similar errors) but for some reason my system is blocked at the HalfOpen
state. This is probably because of my logic but I can't figure out why.
Here are some simplified snippets :
Controller actor
case Processed(pfile) =>
logger.debug(s"Controller received good processing of file
[${pfile.fileName}]")
fileHandler ! Processed(pfile)
processFiles(1)
case Unprocessed => processFiles(1) //sends a file to processors
through a router
File processor actor(s)
val breaker =
CircuitBreaker(context.system.scheduler,
maxFailures = 1,
callTimeout = 30 seconds,
resetTimeout = 10 seconds)
breaker.onOpen {
logger.info("Switching to Open state")
}.onHalfOpen {
logger.info("Switching to HalfOpen state")
}.onClose {
logger.info("Switching to Closed state")
}
case Forwarded(file) =>
logger.debug(s"FileParser${hashCode()} detected file
[${file.getName}]")
try{
breaker.withSyncCircuitBreaker {
val parsedFile = doParsingAndValidation()
parsedFile match {
case Success(parsedFile) =>
using((connectionFactory ? GetConnection).mapTo[Connection
].waitForResult) { conn =>
conn.setAutoCommit(false)
doJdbcStuff(conn)
controller ! Processed(parsedFile)
PeekMailboxExtension.ack()
}
case Failure(err) => controller ! Malformed(file, err)
}
}
} catch {
case ex: Exception =>
controller ! Unprocessed
logger.error(s"Couldn't process file [${file.getName}]. Will try
again later.")
}
*Here is a link to the logs <http://pastebin.com/dQavVcr3>*. You can see
It's stuck in Half Open state because for some reason it's not receiving
messages anymore. I've also got some weird stuff going on in my logs like :
16:43:06.161 [catalinaSys-akka.actor.peek-dispatcher-6] ERROR
fr.catalina.actors.FileProcessor - Couldn't process file [MS7347RP.114].
Will try again later.
16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] DEBUG
fr.catalina.actors.FileProcessor - FileParser1834490077 detected file
[MS0134RP.114]
16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] ERROR
fr.catalina.actors.FileProcessor - Couldn't process file [MS0134RP.114].
Will try again later.
16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] DEBUG
fr.catalina.actors.FileProcessor - FileParser1834490077 detected file
[MS0134RP.114]
16:43:06.168 [catalinaSys-akka.actor.peek-dispatcher-6] ERROR
fr.catalina.actors.FileProcessor - Couldn't process file [MS0134RP.114].
Will try again later.
Why twice ?
Anyway, I'd appreciate any tips or advices pointing me in the right
direction. Thanks !
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.