This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 4d8aa26404 chore: Add PekkoManagedBlocker to reduce memory.
4d8aa26404 is described below
commit 4d8aa264045a0c210a63dafd4b06a4352d69340c
Author: He-Pin <[email protected]>
AuthorDate: Tue Jan 23 16:29:05 2024 +0800
chore: Add PekkoManagedBlocker to reduce memory.
---
.../apache/pekko/dispatch/ThreadPoolBuilder.scala | 24 +++++++++++++---------
1 file changed, 14 insertions(+), 10 deletions(-)
diff --git
a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
index c292be9de8..b11f9aa42a 100644
--- a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
+++ b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala
@@ -173,19 +173,23 @@ object MonitorableThreadFactory {
val doNothing: Thread.UncaughtExceptionHandler =
new Thread.UncaughtExceptionHandler() { def uncaughtException(thread:
Thread, cause: Throwable) = () }
+ private class PekkoManagedBlocker[T](thunk: => T)
+ extends AtomicReference[Option[T]](None) with
ForkJoinPool.ManagedBlocker {
+ final override def block(): Boolean = {
+ set(Some(thunk))
+ true
+ }
+
+ final override def isReleasable: Boolean = get().isDefined // Exception
intended if None
+ }
+
private[pekko] class PekkoForkJoinWorkerThread(_pool: ForkJoinPool)
extends ForkJoinWorkerThread(_pool)
with BlockContext {
- override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
- val result = new AtomicReference[Option[T]](None)
- ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
- def block(): Boolean = {
- result.set(Some(thunk))
- true
- }
- def isReleasable = result.get.isDefined
- })
- result.get.get // Exception intended if None
+ final override def blockOn[T](thunk: => T)(implicit permission: CanAwait):
T = {
+ val blocker = new PekkoManagedBlocker(thunk)
+ ForkJoinPool.managedBlock(blocker)
+ blocker.get.get // Exception intended if None
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]