This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 6f11d48b2 Memory leak in `akka.actor.LocalActorRef` (#5442)
6f11d48b2 is described below
commit 6f11d48b216a01b7c6fa3342d2b8b972109106f7
Author: Yevhen Sentiabov <[email protected]>
AuthorDate: Fri Sep 29 08:33:49 2023 -0500
Memory leak in `akka.actor.LocalActorRef` (#5442)
* - Replaced the usage of deprecated OverflowStrategy.dropNew with
BoundedSourceQueueStage
- Added proper clean-up of materialized resources to prevent memory leaks
for long-running streams
* - Added an execution context to the PoolingRestClient to be able to
propagate it correctly from custom implementations
---
.../core/containerpool/AkkaContainerClient.scala | 4 +--
.../logging/ElasticSearchLogStore.scala | 2 +-
.../logging/ElasticSearchRestClient.scala | 11 +++----
.../core/database/CouchDbRestClient.scala | 6 ++--
.../apache/openwhisk/http/PoolingRestClient.scala | 35 ++++++++++++++--------
5 files changed, 34 insertions(+), 24 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
index 8dbc41f8a..03dcf7833 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
@@ -70,8 +70,8 @@ protected class AkkaContainerClient(
port: Int,
timeout: FiniteDuration,
queueSize: Int,
- retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging,
as: ActorSystem)
- extends PoolingRestClient("http", hostname, port, queueSize, timeout =
Some(timeout))
+ retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging,
as: ActorSystem, ec: ExecutionContext)
+ extends PoolingRestClient("http", hostname, port, queueSize, timeout =
Some(timeout))(as, ec)
with ContainerClient {
def close() = shutdown()
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala
index 65033d3db..7c2faa908 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala
@@ -79,7 +79,7 @@ class ElasticSearchLogStore(
elasticSearchConfig.protocol,
elasticSearchConfig.host,
elasticSearchConfig.port,
- httpFlow)
+ httpFlow)(system, system.dispatcher)
private def transcribeLogs(queryResult: EsSearchResult): ActivationLogs =
ActivationLogs(queryResult.hits.hits.map(_.source.convertTo[UserLogEntry].toFormattedString))
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClient.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClient.scala
index edd7d857f..04343dcdd 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClient.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClient.scala
@@ -17,20 +17,16 @@
package org.apache.openwhisk.core.containerpool.logging
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Either, Try}
-
import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.HttpMethods.{GET, POST}
import akka.http.scaladsl.model.headers.Accept
import akka.stream.scaladsl.Flow
-import scala.concurrent.Promise
import scala.util.Try
-
import spray.json._
-
import org.apache.openwhisk.http.PoolingRestClient
import org.apache.openwhisk.http.PoolingRestClient._
@@ -154,8 +150,9 @@ class ElasticSearchRestClient(
host: String,
port: Int,
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]),
(Try[HttpResponse], Promise[HttpResponse]), Any]] = None)(
- implicit system: ActorSystem)
- extends PoolingRestClient(protocol, host, port, 16 * 1024, httpFlow) {
+ implicit system: ActorSystem,
+ ec: ExecutionContext)
+ extends PoolingRestClient(protocol, host, port, 16 * 1024,
httpFlow)(system, ec) {
import ElasticSearchJsonProtocol._
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestClient.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestClient.scala
index ff0e74b73..823a1679a 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestClient.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestClient.scala
@@ -42,9 +42,11 @@ import scala.concurrent.{ExecutionContext, Future}
class CouchDbRestClient(protocol: String, host: String, port: Int, username:
String, password: String, db: String)(
implicit system: ActorSystem,
logging: Logging)
- extends PoolingRestClient(protocol, host, port, 16 * 1024) {
+ extends PoolingRestClient(protocol, host, port, 16 * 1024)(
+ system,
+ system.dispatchers.lookup("dispatchers.couch-dispatcher")) {
- protected implicit override val context: ExecutionContext =
system.dispatchers.lookup("dispatchers.couch-dispatcher")
+ protected implicit val context: ExecutionContext =
system.dispatchers.lookup("dispatchers.couch-dispatcher")
// Headers common to all requests.
protected val baseHeaders: List[HttpHeader] =
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala
b/common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala
index 8d866542b..cd8d5a77e 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala
@@ -24,11 +24,13 @@ import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.http.scaladsl.unmarshalling._
-import akka.stream.{OverflowStrategy, QueueOfferResult}
import akka.stream.scaladsl.{Flow, _}
+import akka.stream.{KillSwitches, QueueOfferResult}
+import org.apache.openwhisk.common.AkkaLogging
import spray.json._
-import scala.concurrent.{ExecutionContext, Future, Promise}
+
import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}
/**
@@ -45,10 +47,10 @@ class PoolingRestClient(
port: Int,
queueSize: Int,
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]),
(Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
- timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) {
+ timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem, ec:
ExecutionContext) {
require(protocol == "http" || protocol == "https", "Protocol must be one of
{ http, https }.")
- protected implicit val context: ExecutionContext = system.dispatcher
+ private val logging = new AkkaLogging(system.log)
//if specified, override the ClientConnection idle-timeout and keepalive
socket option value
private val timeoutSettings = {
@@ -72,16 +74,19 @@ class PoolingRestClient(
// Additional queue in case all connections are busy. Should hardly ever be
// filled in practice but can be useful, e.g., in tests starting many
// asynchronous requests in a very short period of time.
- private val requestQueue = Source
- .queue(queueSize, OverflowStrategy.dropNew)
+ private val ((requestQueue, killSwitch), sinkCompletion) = Source
+ .queue(queueSize)
.via(httpFlow.getOrElse(pool))
+ .viaMat(KillSwitches.single)(Keep.both)
.toMat(Sink.foreach({
case (Success(response), p) =>
p.success(response)
case (Failure(error), p) =>
p.failure(error)
- }))(Keep.left)
- .run
+ }))(Keep.both)
+ .run()
+
+ sinkCompletion.onComplete(_ => shutdown())
/**
* Execute an HttpRequest on the underlying connection pool.
@@ -96,10 +101,10 @@ class PoolingRestClient(
// When the future completes, we know whether the request made it
// through the queue.
- requestQueue.offer(request -> promise).flatMap {
+ requestQueue.offer(request -> promise) match {
case QueueOfferResult.Enqueued => promise.future
- case QueueOfferResult.Dropped => Future.failed(new Exception("DB
request queue is full."))
- case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB
request queue was closed."))
+ case QueueOfferResult.Dropped => Future.failed(new
Exception("Request queue is full."))
+ case QueueOfferResult.QueueClosed => Future.failed(new
Exception("Request queue was closed."))
case QueueOfferResult.Failure(f) => Future.failed(f)
}
}
@@ -127,7 +132,13 @@ class PoolingRestClient(
}
}
- def shutdown(): Future[Unit] = Future.unit
+ def shutdown(): Future[Unit] = {
+ killSwitch.shutdown()
+ Try(requestQueue.complete()).recover {
+ case t: IllegalStateException => logging.warn(this, t.getMessage)
+ }
+ Future.unit
+ }
}
object PoolingRestClient {