This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new c53cacb Refactor KubernetesClient to separate invokerAgent; Add forwarding LogStoreProvider. (#3411) c53cacb is described below commit c53cacba0e8d4f3c655e23ce2c558716b7f0a472 Author: David Grove <dgrove-...@users.noreply.github.com> AuthorDate: Mon Mar 12 04:13:14 2018 -0400 Refactor KubernetesClient to separate invokerAgent; Add forwarding LogStoreProvider. (#3411) This commit pushes the invokerAgent functionality into a new subclass of KubernetesClient, KubernetesClientWithInvokerAgent, to achieve a cleaner separation of concerns. It also adds a new LogDriverLogStore implementation for Kubernetes that uses the invokerAgent to perform remote processing and forwarding of action logs. This enables more efficient log processing than relying on the Kubernetes API to retrieve the logs and stream them through the invoker. It also adds a unit test for the log forwarding support in KubernetesContainer and some minor cleanup of the test cases to uniformly use the await helper function. --- .../kubernetes/KubernetesClient.scala | 81 ++--------- .../KubernetesClientWithInvokerAgent.scala | 158 +++++++++++++++++++++ .../kubernetes/KubernetesContainer.scala | 32 ++++- .../kubernetes/KubernetesContainerFactory.scala | 14 +- .../KubernetesInvokerAgentLogStore.scala | 76 ++++++++++ .../kubernetes/test/KubernetesClientTests.scala | 54 ++++--- .../kubernetes/test/KubernetesContainerTests.scala | 26 +++- 7 files changed, 353 insertions(+), 88 deletions(-) diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala index 6853f66..4cfc813 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala @@ -26,11 +26,10 @@ import java.time.format.DateTimeFormatterBuilder import akka.actor.ActorSystem import akka.event.Logging.{ErrorLevel, InfoLevel} -import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri} +import akka.http.scaladsl.model.Uri import akka.http.scaladsl.model.Uri.Path import akka.http.scaladsl.model.Uri.Query import akka.stream.{Attributes, Outlet, SourceShape} -import akka.http.scaladsl.Http import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import akka.stream.stage._ @@ -82,10 +81,10 @@ case class KubernetesInvokerAgentConfig(enabled: Boolean, port: Int) case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig, invokerAgent: KubernetesInvokerAgentConfig) /** - * Serves as interface to the kubectl CLI tool. + * Serves as an interface to the Kubernetes API by proxying its REST API and/or invoking the kubectl CLI. * - * Be cautious with the ExecutionContext passed to this, as the - * calls to the CLI are blocking. + * Be cautious with the ExecutionContext passed to this, as many + * operations are blocking. * * You only need one instance (and you shouldn't get more). */ @@ -94,9 +93,9 @@ class KubernetesClient( executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem) extends KubernetesApi with ProcessRunner { - implicit private val ec = executionContext - implicit private val am = ActorMaterializer() - implicit private val kubeRestClient = new DefaultKubernetesClient( + implicit protected val ec = executionContext + implicit protected val am = ActorMaterializer() + implicit protected val kubeRestClient = new DefaultKubernetesClient( new ConfigBuilder() .withConnectionTimeout(config.timeouts.logs.toMillis.toInt) .withRequestTimeout(config.timeouts.logs.toMillis.toInt) @@ -171,56 +170,14 @@ class KubernetesClient( } def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = { - if (ensureUnpaused && config.invokerAgent.enabled) { - // The caller can't guarantee that every container with the label key=value is already unpaused. - // Therefore we must enumerate them and ensure they are unpaused before we attempt to delete them. - Future { - blocking { - kubeRestClient - .inNamespace(kubeRestClient.getNamespace) - .pods() - .withLabel(key, value) - .list() - .getItems - .asScala - .map { pod => - val container = toContainer(pod) - container - .resume() - .recover { case _ => () } // Ignore errors; it is possible the container was not actually suspended. - .map(_ => rm(container)) - } - } - }.flatMap(futures => - Future - .sequence(futures) - .map(_ => ())) - } else { - runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), config.timeouts.rm).map(_ => ()) - } + runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), config.timeouts.rm).map(_ => ()) } - def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = { - if (config.invokerAgent.enabled) { - agentCommand("suspend", container) - .map { response => - response.discardEntityBytes() - } - } else { - Future.successful({}) - } - } + // suspend is a no-op with the basic KubernetesClient + def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = Future.successful({}) - def resume(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = { - if (config.invokerAgent.enabled) { - agentCommand("resume", container) - .map { response => - response.discardEntityBytes() - } - } else { - Future.successful({}) - } - } + // resume is a no-op with the basic KubernetesClient + def resume(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = Future.successful({}) def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean = false)( implicit transid: TransactionId): Source[TypedLogLine, Any] = { @@ -233,7 +190,7 @@ class KubernetesClient( } - private def toContainer(pod: Pod): KubernetesContainer = { + protected def toContainer(pod: Pod): KubernetesContainer = { val id = ContainerId(pod.getMetadata.getName) val addr = ContainerAddress(pod.getStatus.getPodIP) val workerIP = pod.getStatus.getHostIP @@ -244,17 +201,7 @@ class KubernetesClient( new KubernetesContainer(id, addr, workerIP, nativeContainerId) } - // Forward a command to invoker-agent daemonset instance on container's worker node - private def agentCommand(command: String, container: KubernetesContainer): Future[HttpResponse] = { - val uri = Uri() - .withScheme("http") - .withHost(container.workerIP) - .withPort(config.invokerAgent.port) - .withPath(Path / command / container.nativeContainerId) - Http().singleRequest(HttpRequest(uri = uri)) - } - - private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = { + protected def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = { val cmd = kubectlCmd ++ args val start = transid.started( this, diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala new file mode 100644 index 0000000..417a287 --- /dev/null +++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool.kubernetes + +import whisk.common.{Logging, TransactionId} +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.Uri.Path +import akka.http.scaladsl.model.{HttpRequest, HttpResponse, MessageEntity, Uri} +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.http.scaladsl.marshalling.Marshal +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import pureconfig.loadConfigOrThrow +import spray.json._ +import spray.json.DefaultJsonProtocol._ +import whisk.core.ConfigKeys +import whisk.core.entity.ByteSize + +import collection.JavaConverters._ +import scala.concurrent.{blocking, ExecutionContext, Future} + +/** + * An extended kubernetes client that works in tandem with an invokerAgent DaemonSet with + * instances running on every worker node that runs user containers to provide + * suspend/resume capability and higher performance log processing. + */ +class KubernetesClientWithInvokerAgent(config: KubernetesClientConfig = + loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes))( + executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem) + extends KubernetesClient(config)(executionContext) + with KubernetesApiWithInvokerAgent { + + override def rm(key: String, value: String, ensureUnpaused: Boolean = false)( + implicit transid: TransactionId): Future[Unit] = { + if (ensureUnpaused) { + // The caller can't guarantee that every container with the label key=value is already unpaused. + // Therefore we must enumerate them and ensure they are unpaused before we attempt to delete them. + Future { + blocking { + kubeRestClient + .inNamespace(kubeRestClient.getNamespace) + .pods() + .withLabel(key, value) + .list() + .getItems + .asScala + .map { pod => + val container = toContainer(pod) + container + .resume() + .recover { case _ => () } // Ignore errors; it is possible the container was not actually suspended. + .map(_ => rm(container)) + } + } + }.flatMap(futures => + Future + .sequence(futures) + .map(_ => ())) + } else { + super.rm(key, value, ensureUnpaused) + } + } + + override def suspend(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = { + agentCommand("suspend", container) + .map(_.discardEntityBytes()) + } + + override def resume(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = { + agentCommand("resume", container) + .map(_.discardEntityBytes()) + } + + override def forwardLogs(container: KubernetesContainer, + lastOffset: Long, + sizeLimit: ByteSize, + sentinelledLogs: Boolean, + additionalMetadata: Map[String, JsValue], + augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Long] = { + val serializedData = Map( + "lastOffset" -> JsNumber(lastOffset), + "sizeLimit" -> JsNumber(sizeLimit.toBytes), + "sentinelledLogs" -> JsBoolean(sentinelledLogs), + "encodedLogLineMetadata" -> JsString(fieldsString(additionalMetadata)), + "encodedActivation" -> JsString(augmentedActivation.compactPrint)) + + agentCommand("logs", container, Some(serializedData)) + .flatMap(response => Unmarshal(response.entity).to[String].map(_.toLong)) + } + + override def agentCommand(command: String, + container: KubernetesContainer, + payload: Option[Map[String, JsValue]] = None): Future[HttpResponse] = { + val uri = Uri() + .withScheme("http") + .withHost(container.workerIP) + .withPort(config.invokerAgent.port) + .withPath(Path / command / container.nativeContainerId) + + Marshal(payload).to[MessageEntity].flatMap { entity => + Http().singleRequest(HttpRequest(uri = uri, entity = entity)) + } + } + + private def fieldsString(fields: Map[String, JsValue]) = + fields + .map { + case (key, value) => s""""$key":${value.compactPrint}""" + } + .mkString(",") +} + +trait KubernetesApiWithInvokerAgent extends KubernetesApi { + + /** + * Request the invokerAgent running on the container's worker node to execute the given command + * @param command The command verb to execute + * @param container The container to which the command should be applied + * @param payload The additional data needed to execute the command. + * @return The HTTPResponse from the remote agent. + */ + def agentCommand(command: String, + container: KubernetesContainer, + payload: Option[Map[String, JsValue]] = None): Future[HttpResponse] + + /** + * Forward a section the argument container's stdout/stderr output to an external logging service. + * + * @param container the container whose logs should be forwarded + * @param lastOffset the last offset previously read in the remote log file + * @param sizeLimit The maximum number of bytes of log that should be forwarded before truncation + * @param sentinelledLogs Should the log forwarder expect a sentinel line at the end of stdout/stderr streams? + * @param additionalMetadata Additional metadata that should be injected into every log line + * @param augmentedActivation Activation record to be appended to the forwarded log. + * @return the last offset read from the remote log file (to be used on next call to forwardLogs) + */ + def forwardLogs(container: KubernetesContainer, + lastOffset: Long, + sizeLimit: ByteSize, + sentinelledLogs: Boolean, + additionalMetadata: Map[String, JsValue], + augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Long] +} diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala index f1d03d1..11ebbbf 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala @@ -18,12 +18,13 @@ package whisk.core.containerpool.kubernetes import java.time.Instant -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{AtomicLong, AtomicReference} import akka.stream.StreamLimitReachedException import akka.stream.scaladsl.Framing.FramingException import akka.stream.scaladsl.Source import akka.util.ByteString +import spray.json._ import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -97,6 +98,9 @@ class KubernetesContainer(protected[core] val id: ContainerId, /** The last read timestamp in the log file */ private val lastTimestamp = new AtomicReference[Option[Instant]](None) + /** The last offset read in the remote log file */ + private val lastOffset = new AtomicLong(0) + protected val waitForLogs: FiniteDuration = 2.seconds def suspend()(implicit transid: TransactionId): Future[Unit] = kubernetes.suspend(this) @@ -110,6 +114,31 @@ class KubernetesContainer(protected[core] val id: ContainerId, private val stringSentinel = DockerContainer.ActivationSentinel.utf8String + /** + * Request that the activation's log output be forwarded to an external log service (implicit in LogProvider choice). + * Additional per log line metadata and the activation record is provided to be optionally included + * in the forwarded log entry. + * + * @param sizeLimit The maximum number of bytes of log that should be forwardewd + * @param sentinelledLogs Should the log forwarder expect a sentinel line at the end of stdout/stderr streams? + * @param additionalMetadata Additional metadata that should be injected into every log line + * @param augmentedActivation Activation record to be appended to the forwarded log. + */ + def forwardLogs(sizeLimit: ByteSize, + sentinelledLogs: Boolean, + additionalMetadata: Map[String, JsValue], + augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Unit] = { + kubernetes match { + case client: KubernetesApiWithInvokerAgent => { + client + .forwardLogs(this, lastOffset.get, sizeLimit, sentinelledLogs, additionalMetadata, augmentedActivation) + .map(newOffset => lastOffset.set(newOffset)) + } + case _ => + Future.failed(new UnsupportedOperationException("forwardLogs requires whisk.kubernetes.invokerAgent.enabled")) + } + } + def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = { kubernetes @@ -136,5 +165,4 @@ class KubernetesContainer(protected[core] val id: ContainerId, line.toByteString } } - } diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala index b06ec5c..8b2d918 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala @@ -18,6 +18,7 @@ package whisk.core.containerpool.kubernetes import akka.actor.ActorSystem +import pureconfig.loadConfigOrThrow import scala.concurrent.Await import scala.concurrent.ExecutionContext @@ -32,14 +33,23 @@ import whisk.core.containerpool.ContainerFactoryProvider import whisk.core.entity.ByteSize import whisk.core.entity.ExecManifest.ImageName import whisk.core.entity.InstanceId -import whisk.core.WhiskConfig +import whisk.core.{ConfigKeys, WhiskConfig} class KubernetesContainerFactory(label: String, config: WhiskConfig)(implicit actorSystem: ActorSystem, ec: ExecutionContext, logging: Logging) extends ContainerFactory { - implicit val kubernetes = new KubernetesClient()(ec) + implicit val kubernetes = initializeKubeClient() + + private def initializeKubeClient(): KubernetesClient = { + val config = loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes) + if (config.invokerAgent.enabled) { + new KubernetesClientWithInvokerAgent(config)(ec) + } else { + new KubernetesClient(config)(ec) + } + } /** Perform cleanup on init */ override def init(): Unit = cleanup() diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesInvokerAgentLogStore.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesInvokerAgentLogStore.scala new file mode 100644 index 0000000..17f8363 --- /dev/null +++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesInvokerAgentLogStore.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool.kubernetes + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import spray.json._ +import spray.json.DefaultJsonProtocol._ +import whisk.common.TransactionId +import whisk.core.containerpool.Container +import whisk.core.containerpool.logging.{LogCollectingException, LogDriverLogStore, LogStore, LogStoreProvider} +import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation} + +import scala.concurrent.{ExecutionContext, Future} + +/** + * A LogStore implementation for Kubernetes that delegates all log processing to a remote invokerAgent that + * runs on the worker node where the user container is executing. The remote invokerAgent will read container logs, + * enrich them with the activation-specific metadata it is provided, and consolidate them into a remote + * combined log file that can be processed asynchronously by log forwarding services. + * + * Logs are never processed by the invoker itself and therefore are not stored in the activation record; + * collectLogs will return an empty ActivationLogs. + */ +class KubernetesInvokerAgentLogStore(system: ActorSystem) extends LogDriverLogStore(system) { + implicit val ec: ExecutionContext = system.dispatcher + implicit val mat: ActorMaterializer = ActorMaterializer()(system) + + override def collectLogs(transid: TransactionId, + user: Identity, + activation: WhiskActivation, + container: Container, + action: ExecutableWhiskAction): Future[ActivationLogs] = { + + val sizeLimit = action.limits.logs.asMegaBytes + val sentinelledLogs = action.exec.sentinelledLogs + + // Add the userId field to every written record, so any background process can properly correlate. + val userIdField = Map("namespaceId" -> user.authkey.uuid.toJson) + + val additionalMetadata = Map( + "activationId" -> activation.activationId.asString.toJson, + "action" -> action.fullyQualifiedName(false).asString.toJson) ++ userIdField + + val augmentedActivation = JsObject(activation.toJson.fields ++ userIdField) + + container match { + case kc: KubernetesContainer => { + kc.forwardLogs(sizeLimit, sentinelledLogs, additionalMetadata, augmentedActivation)(transid) + .map { _ => + ActivationLogs() + } + } + case _ => Future.failed(LogCollectingException(ActivationLogs())) + } + } +} + +object KubernetesInvokerAgentLogStoreProvider extends LogStoreProvider { + override def logStore(actorSystem: ActorSystem): LogStore = new KubernetesInvokerAgentLogStore(actorSystem) +} diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala index 0eb4f95..5190a2f 100644 --- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala @@ -20,6 +20,7 @@ package whisk.core.containerpool.kubernetes.test import java.time.Instant import akka.actor.ActorSystem +import akka.http.scaladsl.model.HttpResponse import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Concat, Sink, Source} @@ -37,15 +38,10 @@ import org.scalatest.Matchers import org.scalatest.time.{Seconds, Span} import common.{StreamLogging, WskActorSystem} import okio.Buffer +import spray.json.{JsObject, JsValue} import whisk.common.TransactionId import whisk.core.containerpool.{ContainerAddress, ContainerId} -import whisk.core.containerpool.kubernetes.{ - KubernetesApi, - KubernetesClient, - KubernetesContainer, - KubernetesRestLogSourceStage, - TypedLogLine -} +import whisk.core.containerpool.kubernetes._ import whisk.core.entity.ByteSize import whisk.core.entity.size._ @@ -65,6 +61,9 @@ class KubernetesClientTests implicit val materializer: ActorMaterializer = ActorMaterializer() + val commandTimeout = 500.milliseconds + def await[A](f: Future[A], timeout: FiniteDuration = commandTimeout) = Await.result(f, timeout) + /** Reads logs into memory and awaits them */ def awaitLogs(source: Source[TypedLogLine, Any], timeout: FiniteDuration = 1000.milliseconds): Vector[TypedLogLine] = Await.result(source.runWith(Sink.seq[TypedLogLine]), timeout).toVector @@ -77,16 +76,17 @@ class KubernetesClientTests val id = ContainerId("55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52") val container = kubernetesContainer(id) - val commandTimeout = 500.milliseconds - def await[A](f: Future[A], timeout: FiniteDuration = commandTimeout) = Await.result(f, timeout) - val kubectlCommand = "kubectl" /** Returns a KubernetesClient with a mocked result for 'executeProcess' */ - def kubernetesClient(fixture: => Future[String]) = new KubernetesClient()(global) { - override def findKubectlCmd() = kubectlCommand - override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext, as: ActorSystem) = - fixture + def kubernetesClient(fixture: => Future[String]) = { + new KubernetesClient()(global) { + override def findKubectlCmd() = kubectlCommand + + override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext, + as: ActorSystem) = + fixture + } } def kubernetesContainer(id: ContainerId) = @@ -121,7 +121,7 @@ class KubernetesClientTests implicit val kubernetes = new TestKubernetesClient val id = ContainerId("id") val container = new KubernetesContainer(id, ContainerAddress("ip"), "127.0.0.1", "docker://foo") - container.suspend() + await(container.suspend()) kubernetes.suspends should have size 1 kubernetes.suspends(0) shouldBe id } @@ -130,7 +130,7 @@ class KubernetesClientTests implicit val kubernetes = new TestKubernetesClient val id = ContainerId("id") val container = new KubernetesContainer(id, ContainerAddress("ip"), "127.0.0.1", "docker://foo") - container.resume() + await(container.resume()) kubernetes.resumes should have size 1 kubernetes.resumes(0) shouldBe id } @@ -237,4 +237,26 @@ object KubernetesClientTests { Source(List.empty[TypedLogLine]) } } + + class TestKubernetesClientWithInvokerAgent extends TestKubernetesClient with KubernetesApiWithInvokerAgent { + var agentCommands = mutable.Buffer.empty[(ContainerId, String, Option[Map[String, JsValue]])] + var forwardLogs = mutable.Buffer.empty[(ContainerId, Long)] + + def agentCommand(command: String, + container: KubernetesContainer, + payload: Option[Map[String, JsValue]] = None): Future[HttpResponse] = { + agentCommands += ((container.id, command, payload)) + Future.successful(HttpResponse()) + } + + def forwardLogs(container: KubernetesContainer, + lastOffset: Long, + sizeLimit: ByteSize, + sentinelledLogs: Boolean, + additionalMetadata: Map[String, JsValue], + augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Long] = { + forwardLogs += ((container.id, lastOffset)) + Future.successful(lastOffset + sizeLimit.toBytes) // for testing, pretend we read size limit bytes + } + } } diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala index 3415b68..fc3ba68 100644 --- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala @@ -50,6 +50,7 @@ import whisk.core.entity.ActivationResponse.Timeout import whisk.core.entity.size._ import whisk.http.Messages import whisk.core.containerpool.docker.test.DockerContainerTests._ +import whisk.core.containerpool.kubernetes.test.KubernetesClientTests.TestKubernetesClientWithInvokerAgent import scala.collection.{immutable, mutable} @@ -84,8 +85,11 @@ class KubernetesContainerTests val toFormattedString: Flow[ByteString, String, NotUsed] = Flow[ByteString].map(_.utf8String.parseJson.convertTo[TypedLogLine].toString) + val commandTimeout = 500.milliseconds + def await[A](f: Future[A], timeout: FiniteDuration = commandTimeout) = Await.result(f, timeout) + /** Reads logs into memory and awaits them */ - def awaitLogs(source: Source[ByteString, Any], timeout: FiniteDuration = 500.milliseconds): Vector[String] = + def awaitLogs(source: Source[ByteString, Any], timeout: FiniteDuration = commandTimeout): Vector[String] = Await.result(source.via(toFormattedString).runWith(Sink.seq[String]), timeout).toVector val containerId = ContainerId("id") @@ -293,6 +297,26 @@ class KubernetesContainerTests } /* + * LOG FORWARDING + */ + it should "container should maintain lastOffset across calls to forwardLogs" in { + implicit val kubernetes = new TestKubernetesClientWithInvokerAgent + val id = ContainerId("id") + val container = new KubernetesContainer(id, ContainerAddress("ip"), "127.0.0.1", "docker://foo") + val logChunk = 10.kilobytes + + await(container.forwardLogs(logChunk, false, Map.empty, JsObject())) + await(container.forwardLogs(42.bytes, false, Map.empty, JsObject())) + await(container.forwardLogs(logChunk, false, Map.empty, JsObject())) + await(container.forwardLogs(42.bytes, false, Map.empty, JsObject())) + + kubernetes.forwardLogs(0) shouldBe (id, 0) + kubernetes.forwardLogs(1) shouldBe (id, logChunk.toBytes) + kubernetes.forwardLogs(2) shouldBe (id, logChunk.toBytes + 42) + kubernetes.forwardLogs(3) shouldBe (id, 2 * logChunk.toBytes + 42) + } + + /* * LOGS */ it should "read a simple log with sentinel" in { -- To stop receiving notification emails like this one, please contact markusthoem...@apache.org.