This is an automated email from the ASF dual-hosted git repository.
dgrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new ae0c4e0c3 Use max-connection-pool as queue size (#5453)
ae0c4e0c3 is described below
commit ae0c4e0c37a77de4c23ca95dee763b165ce20d18
Author: Dominic Kim <[email protected]>
AuthorDate: Sat Nov 4 02:07:53 2023 +0900
Use max-connection-pool as queue size (#5453)
---
.../core/containerpool/AkkaContainerClient.scala | 36 ++++++++--------------
1 file changed, 13 insertions(+), 23 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
index 03dcf7833..e4958d2d2 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala
@@ -20,37 +20,26 @@ package org.apache.openwhisk.core.containerpool
import akka.actor.ActorSystem
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.marshalling.Marshal
-import akka.http.scaladsl.model.HttpMethods
-import akka.http.scaladsl.model.HttpRequest
-import akka.http.scaladsl.model.HttpResponse
-import akka.http.scaladsl.model.MediaTypes
-import akka.http.scaladsl.model.MessageEntity
-import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.Accept
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.StreamTcpException
-import akka.stream.scaladsl.Sink
-import akka.stream.scaladsl.Source
+import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.TimeoutException
-import scala.concurrent.duration._
-import scala.util.Try
-import scala.util.control.NonFatal
-import spray.json._
-import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.common.LoggingMarkers.CONTAINER_CLIENT_RETRIES
-import org.apache.openwhisk.common.MetricEmitter
-import org.apache.openwhisk.common.TransactionId
-import org.apache.openwhisk.core.entity.ActivationResponse.ContainerHttpError
-import org.apache.openwhisk.core.entity.ActivationResponse._
-import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ByteSize}
+import org.apache.openwhisk.common.{Logging, MetricEmitter, TransactionId}
+import
org.apache.openwhisk.core.entity.ActivationResponse.{ContainerHttpError, _}
import org.apache.openwhisk.core.entity.size.SizeLong
+import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ByteSize}
import org.apache.openwhisk.http.PoolingRestClient
+import pureconfig.loadConfigOrThrow
+import spray.json._
import java.time.Instant
+import scala.concurrent.{Await, ExecutionContext, Future, TimeoutException}
+import scala.concurrent.duration._
+import scala.util.Try
+import scala.util.control.NonFatal
/**
* This HTTP client is used only in the invoker to communicate with the action
container.
@@ -193,6 +182,7 @@ protected class AkkaContainerClient(
}
object AkkaContainerClient {
+ private val queueSize =
loadConfigOrThrow[Int]("akka.http.host-connection-pool.max-connections")
/** A helper method to post one single request to a connection. Used for
container tests. */
def post(host: String, port: Int, endPoint: String, content: JsValue,
timeout: FiniteDuration)(
@@ -226,7 +216,7 @@ object AkkaContainerClient {
tid: TransactionId,
as: ActorSystem,
ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
- val connection = new AkkaContainerClient(host, port, timeout, 1)
+ val connection = new AkkaContainerClient(host, port, timeout, queueSize)
val futureResults = contents.map { executeRequest(connection, endPoint, _)
}
val results = Await.result(Future.sequence(futureResults), timeout +
10.seconds) //additional timeout to complete futures
connection.close()