This is an automated email from the ASF dual-hosted git repository.

dgrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new ae0c4e0c3 Use max-connection-pool as queue size (#5453)
ae0c4e0c3 is described below

commit ae0c4e0c37a77de4c23ca95dee763b165ce20d18
Author: Dominic Kim <[email protected]>
AuthorDate: Sat Nov 4 02:07:53 2023 +0900

    Use max-connection-pool as queue size (#5453)
---
 .../core/containerpool/AkkaContainerClient.scala   | 36 ++++++++--------------
 1 file changed, 13 insertions(+), 23 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
index 03dcf7833..e4958d2d2 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
@@ -20,37 +20,26 @@ package org.apache.openwhisk.core.containerpool
 import akka.actor.ActorSystem
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import akka.http.scaladsl.marshalling.Marshal
-import akka.http.scaladsl.model.HttpMethods
-import akka.http.scaladsl.model.HttpRequest
-import akka.http.scaladsl.model.HttpResponse
-import akka.http.scaladsl.model.MediaTypes
-import akka.http.scaladsl.model.MessageEntity
-import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model._
 import akka.http.scaladsl.model.headers.Accept
 import akka.http.scaladsl.unmarshalling.Unmarshal
 import akka.stream.StreamTcpException
-import akka.stream.scaladsl.Sink
-import akka.stream.scaladsl.Source
+import akka.stream.scaladsl.{Sink, Source}
 import akka.util.ByteString
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.TimeoutException
-import scala.concurrent.duration._
-import scala.util.Try
-import scala.util.control.NonFatal
-import spray.json._
-import org.apache.openwhisk.common.Logging
 import org.apache.openwhisk.common.LoggingMarkers.CONTAINER_CLIENT_RETRIES
-import org.apache.openwhisk.common.MetricEmitter
-import org.apache.openwhisk.common.TransactionId
-import org.apache.openwhisk.core.entity.ActivationResponse.ContainerHttpError
-import org.apache.openwhisk.core.entity.ActivationResponse._
-import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ByteSize}
+import org.apache.openwhisk.common.{Logging, MetricEmitter, TransactionId}
+import 
org.apache.openwhisk.core.entity.ActivationResponse.{ContainerHttpError, _}
 import org.apache.openwhisk.core.entity.size.SizeLong
+import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ByteSize}
 import org.apache.openwhisk.http.PoolingRestClient
+import pureconfig.loadConfigOrThrow
+import spray.json._
 
 import java.time.Instant
+import scala.concurrent.{Await, ExecutionContext, Future, TimeoutException}
+import scala.concurrent.duration._
+import scala.util.Try
+import scala.util.control.NonFatal
 
 /**
  * This HTTP client is used only in the invoker to communicate with the action 
container.
@@ -193,6 +182,7 @@ protected class AkkaContainerClient(
 }
 
 object AkkaContainerClient {
+  private val queueSize = 
loadConfigOrThrow[Int]("akka.http.host-connection-pool.max-connections")
 
   /** A helper method to post one single request to a connection. Used for 
container tests. */
   def post(host: String, port: Int, endPoint: String, content: JsValue, 
timeout: FiniteDuration)(
@@ -226,7 +216,7 @@ object AkkaContainerClient {
     tid: TransactionId,
     as: ActorSystem,
     ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
-    val connection = new AkkaContainerClient(host, port, timeout, 1)
+    val connection = new AkkaContainerClient(host, port, timeout, queueSize)
     val futureResults = contents.map { executeRequest(connection, endPoint, _) 
}
     val results = Await.result(Future.sequence(futureResults), timeout + 
10.seconds) //additional timeout to complete futures
     connection.close()

Reply via email to