This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f11d48b2 Memory leak in `akka.actor.LocalActorRef` (#5442)
6f11d48b2 is described below

commit 6f11d48b216a01b7c6fa3342d2b8b972109106f7
Author: Yevhen Sentiabov <[email protected]>
AuthorDate: Fri Sep 29 08:33:49 2023 -0500

    Memory leak in `akka.actor.LocalActorRef` (#5442)
    
    * - Replaced the usage of deprecated OverflowStrategy.dropNew with 
BoundedSourceQueueStage
     - Added proper clean-up of materialized resources to prevent memory leaks 
for long-running streams
    
    * - Added an execution context to the PoolingRestClient to be able to 
propagate it correctly from custom implementations
---
 .../core/containerpool/AkkaContainerClient.scala   |  4 +--
 .../logging/ElasticSearchLogStore.scala            |  2 +-
 .../logging/ElasticSearchRestClient.scala          | 11 +++----
 .../core/database/CouchDbRestClient.scala          |  6 ++--
 .../apache/openwhisk/http/PoolingRestClient.scala  | 35 ++++++++++++++--------
 5 files changed, 34 insertions(+), 24 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
index 8dbc41f8a..03dcf7833 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
@@ -70,8 +70,8 @@ protected class AkkaContainerClient(
   port: Int,
   timeout: FiniteDuration,
   queueSize: Int,
-  retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, 
as: ActorSystem)
-    extends PoolingRestClient("http", hostname, port, queueSize, timeout = 
Some(timeout))
+  retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, 
as: ActorSystem, ec: ExecutionContext)
+    extends PoolingRestClient("http", hostname, port, queueSize, timeout = 
Some(timeout))(as, ec)
     with ContainerClient {
 
   def close() = shutdown()
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala
index 65033d3db..7c2faa908 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchLogStore.scala
@@ -79,7 +79,7 @@ class ElasticSearchLogStore(
     elasticSearchConfig.protocol,
     elasticSearchConfig.host,
     elasticSearchConfig.port,
-    httpFlow)
+    httpFlow)(system, system.dispatcher)
 
   private def transcribeLogs(queryResult: EsSearchResult): ActivationLogs =
     
ActivationLogs(queryResult.hits.hits.map(_.source.convertTo[UserLogEntry].toFormattedString))
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClient.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClient.scala
index edd7d857f..04343dcdd 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClient.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/ElasticSearchRestClient.scala
@@ -17,20 +17,16 @@
 
 package org.apache.openwhisk.core.containerpool.logging
 
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContext, Future, Promise}
 import scala.util.{Either, Try}
-
 import akka.actor.ActorSystem
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.model.HttpMethods.{GET, POST}
 import akka.http.scaladsl.model.headers.Accept
 import akka.stream.scaladsl.Flow
 
-import scala.concurrent.Promise
 import scala.util.Try
-
 import spray.json._
-
 import org.apache.openwhisk.http.PoolingRestClient
 import org.apache.openwhisk.http.PoolingRestClient._
 
@@ -154,8 +150,9 @@ class ElasticSearchRestClient(
   host: String,
   port: Int,
   httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), 
(Try[HttpResponse], Promise[HttpResponse]), Any]] = None)(
-  implicit system: ActorSystem)
-    extends PoolingRestClient(protocol, host, port, 16 * 1024, httpFlow) {
+  implicit system: ActorSystem,
+  ec: ExecutionContext)
+    extends PoolingRestClient(protocol, host, port, 16 * 1024, 
httpFlow)(system, ec) {
 
   import ElasticSearchJsonProtocol._
 
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestClient.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestClient.scala
index ff0e74b73..823a1679a 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestClient.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/CouchDbRestClient.scala
@@ -42,9 +42,11 @@ import scala.concurrent.{ExecutionContext, Future}
 class CouchDbRestClient(protocol: String, host: String, port: Int, username: 
String, password: String, db: String)(
   implicit system: ActorSystem,
   logging: Logging)
-    extends PoolingRestClient(protocol, host, port, 16 * 1024) {
+    extends PoolingRestClient(protocol, host, port, 16 * 1024)(
+      system,
+      system.dispatchers.lookup("dispatchers.couch-dispatcher")) {
 
-  protected implicit override val context: ExecutionContext = 
system.dispatchers.lookup("dispatchers.couch-dispatcher")
+  protected implicit val context: ExecutionContext = 
system.dispatchers.lookup("dispatchers.couch-dispatcher")
 
   // Headers common to all requests.
   protected val baseHeaders: List[HttpHeader] =
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala
index 8d866542b..cd8d5a77e 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/http/PoolingRestClient.scala
@@ -24,11 +24,13 @@ import akka.http.scaladsl.marshalling._
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.settings.ConnectionPoolSettings
 import akka.http.scaladsl.unmarshalling._
-import akka.stream.{OverflowStrategy, QueueOfferResult}
 import akka.stream.scaladsl.{Flow, _}
+import akka.stream.{KillSwitches, QueueOfferResult}
+import org.apache.openwhisk.common.AkkaLogging
 import spray.json._
-import scala.concurrent.{ExecutionContext, Future, Promise}
+
 import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
 import scala.util.{Failure, Success, Try}
 
 /**
@@ -45,10 +47,10 @@ class PoolingRestClient(
   port: Int,
   queueSize: Int,
   httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), 
(Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
-  timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) {
+  timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem, ec: 
ExecutionContext) {
   require(protocol == "http" || protocol == "https", "Protocol must be one of 
{ http, https }.")
 
-  protected implicit val context: ExecutionContext = system.dispatcher
+  private val logging = new AkkaLogging(system.log)
 
   //if specified, override the ClientConnection idle-timeout and keepalive 
socket option value
   private val timeoutSettings = {
@@ -72,16 +74,19 @@ class PoolingRestClient(
   // Additional queue in case all connections are busy. Should hardly ever be
   // filled in practice but can be useful, e.g., in tests starting many
   // asynchronous requests in a very short period of time.
-  private val requestQueue = Source
-    .queue(queueSize, OverflowStrategy.dropNew)
+  private val ((requestQueue, killSwitch), sinkCompletion) = Source
+    .queue(queueSize)
     .via(httpFlow.getOrElse(pool))
+    .viaMat(KillSwitches.single)(Keep.both)
     .toMat(Sink.foreach({
       case (Success(response), p) =>
         p.success(response)
       case (Failure(error), p) =>
         p.failure(error)
-    }))(Keep.left)
-    .run
+    }))(Keep.both)
+    .run()
+
+  sinkCompletion.onComplete(_ => shutdown())
 
   /**
    * Execute an HttpRequest on the underlying connection pool.
@@ -96,10 +101,10 @@ class PoolingRestClient(
 
     // When the future completes, we know whether the request made it
     // through the queue.
-    requestQueue.offer(request -> promise).flatMap {
+    requestQueue.offer(request -> promise) match {
       case QueueOfferResult.Enqueued    => promise.future
-      case QueueOfferResult.Dropped     => Future.failed(new Exception("DB 
request queue is full."))
-      case QueueOfferResult.QueueClosed => Future.failed(new Exception("DB 
request queue was closed."))
+      case QueueOfferResult.Dropped     => Future.failed(new 
Exception("Request queue is full."))
+      case QueueOfferResult.QueueClosed => Future.failed(new 
Exception("Request queue was closed."))
       case QueueOfferResult.Failure(f)  => Future.failed(f)
     }
   }
@@ -127,7 +132,13 @@ class PoolingRestClient(
       }
     }
 
-  def shutdown(): Future[Unit] = Future.unit
+  def shutdown(): Future[Unit] = {
+    killSwitch.shutdown()
+    Try(requestQueue.complete()).recover {
+      case t: IllegalStateException => logging.warn(this, t.getMessage)
+    }
+    Future.unit
+  }
 }
 
 object PoolingRestClient {

Reply via email to