This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 2312b7d Recreate HTTP client on Container.resume(). (#4185)
2312b7d is described below
commit 2312b7d6b913fc30bcd56643285fa29aaa18815c
Author: tysonnorris <[email protected]>
AuthorDate: Mon Jan 21 02:13:48 2019 -0800
Recreate HTTP client on Container.resume(). (#4185)
reopen connections only once, during Container.resume()
---
.../openwhisk/core/containerpool/Container.scala | 44 +++++++++++++++-------
.../apache/openwhisk/core/mesos/MesosTask.scala | 4 +-
.../containerpool/docker/DockerContainer.scala | 5 ++-
.../kubernetes/KubernetesContainer.scala | 3 +-
.../test/DockerToActivationLogStoreTests.scala | 2 +-
.../containerpool/test/ContainerProxyTests.scala | 18 ++++++---
6 files changed, 50 insertions(+), 26 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
index a2e2709..354ec38 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
@@ -62,6 +62,11 @@ object Container {
loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool)
}
+/**
+ * Abstraction for Container operations.
+ * Container manipulation (specifically suspend/resume/destroy) is NOT
thread-safe and MUST be synchronized by caller.
+ * Container access (specifically run) is thread-safe (e.g. for concurrent
activation processing).
+ */
trait Container {
implicit protected val as: ActorSystem
@@ -73,7 +78,11 @@ trait Container {
/** HTTP connection to the container, will be lazily established by
callContainer */
protected var httpConnection: Option[ContainerClient] = None
- /** Stops the container from consuming CPU cycles. */
+ /** maxConcurrent+timeout are cached during first init, so that resuming
connections can reference */
+ protected var containerHttpMaxConcurrent: Int = 1
+ protected var containerHttpTimeout: FiniteDuration = 60.seconds
+
+ /** Stops the container from consuming CPU cycles. NOT thread-safe - caller
must synchronize. */
def suspend()(implicit transid: TransactionId): Future[Unit] = {
//close connection first, then close connection pool
//(testing pool recreation vs connection closing, time was similar - so
using the simpler recreation approach)
@@ -82,8 +91,11 @@ trait Container {
closeConnections(toClose)
}
- /** Dual of halt. */
- def resume()(implicit transid: TransactionId): Future[Unit]
+ /** Dual of halt. NOT thread-safe - caller must synchronize.*/
+ def resume()(implicit transid: TransactionId): Future[Unit] = {
+ httpConnection = Some(openConnections(containerHttpTimeout,
containerHttpMaxConcurrent))
+ Future.successful({})
+ }
/** Obtains logs up to a given threshold from the container. Optionally
waits for a sentinel to appear. */
def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid:
TransactionId): Source[ByteString, Any]
@@ -101,7 +113,8 @@ trait Container {
LoggingMarkers.INVOKER_ACTIVATION_INIT,
s"sending initialization to $id $addr",
logLevel = InfoLevel)
-
+ containerHttpMaxConcurrent = maxConcurrent
+ containerHttpTimeout = timeout
val body = JsObject("value" -> initializer)
callContainer("/init", body, timeout, maxConcurrent, retry = true)
.andThen { // never fails
@@ -132,7 +145,7 @@ trait Container {
}
}
- /** Runs code in the container. */
+ /** Runs code in the container. Thread-safe - caller may invoke concurrently
for concurrent activation processing. */
def run(parameters: JsObject, environment: JsObject, timeout:
FiniteDuration, maxConcurrent: Int)(
implicit transid: TransactionId): Future[(Interval, ActivationResponse)] =
{
val actionName =
environment.fields.get("action_name").map(_.convertTo[String]).getOrElse("")
@@ -185,15 +198,7 @@ trait Container {
retry: Boolean = false)(implicit transid:
TransactionId): Future[RunResult] = {
val started = Instant.now()
val http = httpConnection.getOrElse {
- val conn = if (Container.config.akkaClient) {
- new AkkaContainerClient(addr.host, addr.port, timeout,
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024)
- } else {
- new ApacheBlockingContainerClient(
- s"${addr.host}:${addr.port}",
- timeout,
- ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
- maxConcurrent)
- }
+ val conn = openConnections(timeout, maxConcurrent)
httpConnection = Some(conn)
conn
}
@@ -204,6 +209,17 @@ trait Container {
RunResult(Interval(started, finished), response)
}
}
+ private def openConnections(timeout: FiniteDuration, maxConcurrent: Int) = {
+ if (Container.config.akkaClient) {
+ new AkkaContainerClient(addr.host, addr.port, timeout,
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024)
+ } else {
+ new ApacheBlockingContainerClient(
+ s"${addr.host}:${addr.port}",
+ timeout,
+ ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
+ maxConcurrent)
+ }
+ }
private def closeConnections(toClose: Option[ContainerClient]): Future[Unit]
= {
toClose.map(_.close()).getOrElse(Future.successful(()))
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
index 373b123..968f942 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
@@ -205,8 +205,8 @@ class MesosTask(override protected val id: ContainerId,
/** Dual of halt. */
override def resume()(implicit transid: TransactionId): Future[Unit] = {
- // resume not supported
- Future.successful(Unit)
+ super.resume()
+ // resume not supported (just return result from super)
}
/** Completely destroys this instance of the container. */
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
index 7ac95e4..731966b 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
@@ -180,8 +180,9 @@ class DockerContainer(protected val id: ContainerId,
override def suspend()(implicit transid: TransactionId): Future[Unit] = {
super.suspend().flatMap(_ => if (useRunc) runc.pause(id) else
docker.pause(id))
}
- def resume()(implicit transid: TransactionId): Future[Unit] =
- if (useRunc) { runc.resume(id) } else { docker.unpause(id) }
+ override def resume()(implicit transid: TransactionId): Future[Unit] = {
+ (if (useRunc) { runc.resume(id) } else { docker.unpause(id) }).flatMap(_
=> super.resume())
+ }
override def destroy()(implicit transid: TransactionId): Future[Unit] = {
super.destroy()
docker.rm(id)
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
index 4a61647..74fd292 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -104,7 +104,8 @@ class KubernetesContainer(protected[core] val id:
ContainerId,
super.suspend().flatMap(_ => kubernetes.suspend(this))
}
- def resume()(implicit transid: TransactionId): Future[Unit] =
kubernetes.resume(this)
+ override def resume()(implicit transid: TransactionId): Future[Unit] =
+ kubernetes.resume(this).flatMap(_ => super.resume())
override def destroy()(implicit transid: TransactionId): Future[Unit] = {
super.destroy()
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
index 349fc31..6f4caf2 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
@@ -107,7 +107,7 @@ class DockerToActivationLogStoreTests extends FlatSpec with
Matchers with WskAct
val logging: Logging)
extends Container {
override def suspend()(implicit transid: TransactionId): Future[Unit] = ???
- def resume()(implicit transid: TransactionId): Future[Unit] = ???
+ override def resume()(implicit transid: TransactionId): Future[Unit] = ???
def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid:
TransactionId) = lines
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index d61cb84..0477fcc 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.containerpool.test
import java.time.Instant
-
import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
import akka.actor.{ActorRef, ActorSystem, FSM}
import akka.stream.scaladsl.Source
@@ -26,7 +25,6 @@ import akka.testkit.{ImplicitSender, TestKit}
import akka.util.ByteString
import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction,
WhiskProperties}
import java.util.concurrent.atomic.AtomicInteger
-
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
@@ -41,7 +39,7 @@ import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.http.Messages
import org.apache.openwhisk.core.database.UserContext
-
+import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
@@ -1079,11 +1077,19 @@ class ContainerProxyTests
def runCount = atomicRunCount.get()
override def suspend()(implicit transid: TransactionId): Future[Unit] = {
suspendCount += 1
- super.suspend()
+ val s = super.suspend()
+ Await.result(s, 5.seconds)
+ //verify that httpconn is closed
+ httpConnection should be(None)
+ s
}
- def resume()(implicit transid: TransactionId): Future[Unit] = {
+ override def resume()(implicit transid: TransactionId): Future[Unit] = {
resumeCount += 1
- Future.successful(())
+ val r = super.resume()
+ Await.result(r, 5.seconds)
+ //verify that httpconn is recreated
+ httpConnection should be('defined)
+ r
}
override def destroy()(implicit transid: TransactionId): Future[Unit] = {
destroyCount += 1