Hi Jasper

I faced a similar problem just before my vacation (Naughty database that
might block in circuit breakers). I'm just an akka newbie but for what's it
worth here's what I did. I found that using withSyncCircuitBreaker in these
cases was a bad idea. I solved my problem by making the circuit breaker
return a future with a timeout instead and then piped that future further
down the line. That along with a transaction timeout in the driver solved
my issues. In your case I see that you want the actor to block but could
you not simply await the future (since it has a timeout it won't block
forever) and if you get a timeout send back Malformed.

BR

Måns



2014-08-06 17:12 GMT+02:00 Konrad Malawski <[email protected]>:

> Hi Jasper,
> as far as I can see this is not really a problem with akka, but with your
> threads getting stuck on that blocking `close` call?
> The circuit breaker (well, in general the JVM and threads) can't "kill"
> such operation, and you'll get stuck when using such blocking calls.
>
> Not sure how we can help with this specific problem here.
>
>
> On Wed, Aug 6, 2014 at 4:04 PM, Jasper <[email protected]> wrote:
>
>> Just noticed something interesting with YourKit... Here's an example of
>> an actor behaving properly (when all files are processed) :
>>
>> Sys-akka.actor.pinned-dispatcher-7 [RUNNABLE, IN_NATIVE]
>> java.net.SocketInputStream.read(byte[], int, int)SocketInputStream.java:
>> 122
>> org.postgresql.core.VisibleBufferedInputStream.readMore(int)
>> VisibleBufferedInputStream.java:143
>> org.postgresql.core.VisibleBufferedInputStream.ensureBytes(int)
>> VisibleBufferedInputStream.java:112
>> org.postgresql.core.VisibleBufferedInputStream.read()Visible
>> BufferedInputStream.java:71
>> org.postgresql.core.PGStream.ReceiveChar()PGStream.java:269
>> org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(
>> CopyOperationImpl, boolean)QueryExecutorImpl.java:930
>> org.postgresql.core.v3.QueryExecutorImpl.endCopy(CopyInImpl)
>> QueryExecutorImpl.java:828
>> org.postgresql.core.v3.CopyInImpl.endCopy()CopyInImpl.java:59
>> org.postgresql.copy.CopyManager.copyIn(String, Reader, int)CopyManager.
>> java:145
>> org.postgresql.copy.CopyManager.copyIn(String, Reader)CopyManager.java:
>> 124
>> xx.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$
>> applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7$$anonfun$apply$8.
>> apply(Seq)FileProcessor.scala:98
>> xx.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$
>> applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7$$anonfun$apply$8.
>> apply(Object)FileProcessor.scala:84
>> scala.collection.Iterator$class.foreach(Iterator, Function1)Iterator.
>> scala:743
>> scala.collection.AbstractIterator.foreach(Function1)Iterator.scala:1174
>> xx.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$
>> applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7.apply(
>> PushbackReader)FileProcessor.scala:84
>> xx.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$
>> applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7.apply(Object)
>> FileProcessor.scala:82
>> xx.xxxx.util.Cleaning$.using(Object, Function1)Cleaning.scala:12
>> xx.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$
>> applyOrElse$1$$anonfun$apply$mcV$sp$1.apply(Connection)FileProcessor.
>> scala:82
>> xx.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$
>> applyOrElse$1$$anonfun$apply$mcV$sp$1.apply(Object)FileProcessor.scala:75
>> xx.xxxx.util.Cleaning$.using(Object, Function1)Cleaning.scala:12
>> xx.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.
>> apply$mcV$sp()FileProcessor.scala:75
>> xx.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.
>> apply()FileProcessor.scala:56
>> xx.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.
>> apply()FileProcessor.scala:56
>> akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()
>> CircuitBreaker.scala:135
>> akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()
>> CircuitBreaker.scala:135
>> akka.pattern.CircuitBreaker$State$class.callThrough(CircuitBreaker$State,
>> Function0)CircuitBreaker.scala:296
>> akka.pattern.CircuitBreaker$Closed$.callThrough(Function0)CircuitBreaker.
>> scala:345
>> akka.pattern.CircuitBreaker$Closed$.invoke(Function0)CircuitBreaker.scala
>> :354
>> akka.pattern.CircuitBreaker.withCircuitBreaker(Function0)CircuitBreaker.
>> scala:113
>> akka.pattern.CircuitBreaker.withSyncCircuitBreaker(Function0)
>> CircuitBreaker.scala:135
>> xx.xxxx.actors.FileProcessor$$anonfun$receive$1.applyOrElse(Object,
>> Function1)FileProcessor.scala:55
>> akka.actor.Actor$class.aroundReceive(Actor, PartialFunction, Object)Actor
>> .scala:465
>> xx.xxxx.actors.FileProcessor.aroundReceive(PartialFunction, Object)
>> FileProcessor.scala:27
>> akka.actor.ActorCell.receiveMessage(Object)ActorCell.scala:516
>> akka.actor.ActorCell.invoke(Envelope)ActorCell.scala:487
>> akka.dispatch.Mailbox.processMailbox(int, long)Mailbox.scala:238
>> akka.dispatch.Mailbox.run()Mailbox.scala:220
>> java.lang.Thread.run()Thread.java:744
>>
>> Now one who does not :
>>
>> Sys-akka.actor.pinned-dispatcher-6 [WAITING]
>> java.lang.Object.wait()Object.java:503
>> org.postgresql.core.v3.QueryExecutorImpl.waitOnLock()QueryExecutorImpl.
>> java:91
>> org.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList,
>> ResultHandler, int, int, int)QueryExecutorImpl.java:228
>> org.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(
>> Query)AbstractJdbc2Connection.java:808
>> org.postgresql.jdbc2.AbstractJdbc2Connection.rollback()Abstr
>> actJdbc2Connection.java:861
>> com.zaxxer.hikari.proxy.ConnectionProxy.resetConnectionState()
>> ConnectionProxy.java:192
>> com.zaxxer.hikari.proxy.ConnectionProxy.close()ConnectionProxy.java:305
>> java.lang.reflect.Method.invoke(Object, Object[])Method.java:606
>> xx.xxxx.util.Cleaning$.using(Object, Function1)Cleaning.scala:14
>> xx.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.
>> apply$mcV$sp()FileProcessor.scala:75
>> xx.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.
>> apply()FileProcessor.scala:56
>> xx.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.
>> apply()FileProcessor.scala:56
>> akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()
>> CircuitBreaker.scala:135
>> akka.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()
>> CircuitBreaker.scala:135
>> akka.pattern.CircuitBreaker$State$class.callThrough(CircuitBreaker$State,
>> Function0)CircuitBreaker.scala:296
>> akka.pattern.CircuitBreaker$Closed$.callThrough(Function0)CircuitBreaker.
>> scala:345
>> akka.pattern.CircuitBreaker$Closed$.invoke(Function0)CircuitBreaker.scala
>> :354
>> akka.pattern.CircuitBreaker.withCircuitBreaker(Function0)CircuitBreaker.
>> scala:113
>> akka.pattern.CircuitBreaker.withSyncCircuitBreaker(Function0)
>> CircuitBreaker.scala:135
>> xx.xxxx.actors.FileProcessor$$anonfun$receive$1.applyOrElse(Object,
>> Function1)FileProcessor.scala:55
>> akka.actor.Actor$class.aroundReceive(Actor, PartialFunction, Object)Actor
>> .scala:465
>> xx.xxxx.actors.FileProcessor.aroundReceive(PartialFunction, Object)
>> FileProcessor.scala:27
>> akka.actor.ActorCell.receiveMessage(Object)ActorCell.scala:516
>> akka.actor.ActorCell.invoke(Envelope)ActorCell.scala:487
>> akka.dispatch.Mailbox.processMailbox(int, long)Mailbox.scala:238
>> akka.dispatch.Mailbox.run()Mailbox.scala:220
>> java.lang.Thread.run()Thread.java:744
>>
>>
>> Apparently it's blocking because of a lock on the database while closing
>> the connection (closeable.close() is at line 14). Which is weird because I
>> know their connections aren't closed thanks to Hikari log stats.
>>
>>
>>
>>   def using[C <: {def close()} , B](closeable: C)(getB: C => B): B =
>>     try {
>>       getB(closeable)
>>     } finally {
>>       closeable.close()
>>     }
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected].
>> To post to this group, send email to [email protected].
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
> Cheers,
> Konrad 'ktoso' Malawski
> hAkker @ Typesafe
>
> <http://typesafe.com>
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to