diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala
b/common/scala/src/main/scala/whisk/core/connector/Message.scala
index 55e5a64e62..319b0b5661 100644
--- a/common/scala/src/main/scala/whisk/core/connector/Message.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala
@@ -48,6 +48,7 @@ case class ActivationMessage(override val transid:
TransactionId,
activationId: ActivationId,
rootControllerIndex: ControllerInstanceId,
blocking: Boolean,
+ blockingLogs: Boolean,
content: Option[JsObject],
cause: Option[ActivationId] = None,
traceContext: Option[Map[String, String]] = None)
@@ -68,7 +69,7 @@ object ActivationMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))
private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
- implicit val serdes = jsonFormat10(ActivationMessage.apply)
+ implicit val serdes = jsonFormat11(ActivationMessage.apply)
}
/**
diff --git a/core/controller/src/main/scala/whisk/core/controller/Actions.scala
b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
index bd92c581eb..35f3f1db71 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Actions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Actions.scala
@@ -215,34 +215,43 @@ trait WhiskActionsApi extends WhiskCollectionAPI with
PostActionActivation with
implicit transid: TransactionId) = {
parameter(
'blocking ? false,
+ 'logs ? false,
'result ? false,
- 'timeout.as[FiniteDuration] ?
WhiskActionsApi.maxWaitForBlockingActivation) { (blocking, result,
waitOverride) =>
- entity(as[Option[JsObject]]) { payload =>
- getEntity(WhiskActionMetaData.get(entityStore, entityName.toDocId),
Some {
- act: WhiskActionMetaData =>
- // resolve the action --- special case for sequences that may
contain components with '_' as default package
- val action = act.resolve(user.namespace)
- onComplete(entitleReferencedEntitiesMetaData(user,
Privilege.ACTIVATE, Some(action.exec))) {
- case Success(_) =>
- val actionWithMergedParams = env.map(action.inherit(_))
getOrElse action
-
- // incoming parameters may not override final parameters
(i.e., parameters with already defined values)
- // on an action once its parameters are resolved across
package and binding
- val allowInvoke = payload
- .map(_.fields.keySet.forall(key =>
!actionWithMergedParams.immutableParameters.contains(key)))
- .getOrElse(true)
-
- if (allowInvoke) {
- doInvoke(user, actionWithMergedParams, payload, blocking,
waitOverride, result)
- } else {
- terminate(BadRequest, Messages.parametersNotAllowed)
- }
-
- case Failure(f) =>
- super.handleEntitlementFailure(f)
- }
- })
- }
+ 'timeout.as[FiniteDuration] ?
WhiskActionsApi.maxWaitForBlockingActivation) {
+ (blocking, logs, result, waitOverride) =>
+ entity(as[Option[JsObject]]) { payload =>
+ getEntity(WhiskActionMetaData.get(entityStore, entityName.toDocId),
Some {
+ act: WhiskActionMetaData =>
+ // resolve the action --- special case for sequences that may
contain components with '_' as default package
+ val action = act.resolve(user.namespace)
+ onComplete(entitleReferencedEntitiesMetaData(user,
Privilege.ACTIVATE, Some(action.exec))) {
+ case Success(_) =>
+ val actionWithMergedParams = env.map(action.inherit(_))
getOrElse action
+
+ // incoming parameters may not override final parameters
(i.e., parameters with already defined values)
+ // on an action once its parameters are resolved across
package and binding
+ val allowInvoke = payload
+ .map(_.fields.keySet.forall(key =>
!actionWithMergedParams.immutableParameters.contains(key)))
+ .getOrElse(true)
+
+ if (allowInvoke) {
+ doInvoke(
+ user,
+ actionWithMergedParams,
+ payload,
+ blocking,
+ waitOverride,
+ blocking && logs && !result,
+ result)
+ } else {
+ terminate(BadRequest, Messages.parametersNotAllowed)
+ }
+
+ case Failure(f) =>
+ super.handleEntitlementFailure(f)
+ }
+ })
+ }
}
}
@@ -251,9 +260,10 @@ trait WhiskActionsApi extends WhiskCollectionAPI with
PostActionActivation with
payload: Option[JsObject],
blocking: Boolean,
waitOverride: FiniteDuration,
+ blockingLogs: Boolean,
result: Boolean)(implicit transid: TransactionId):
RequestContext => Future[RouteResult] = {
val waitForResponse = if (blocking) Some(waitOverride) else None
- onComplete(invokeAction(user, actionWithMergedParams, payload,
waitForResponse, cause = None)) {
+ onComplete(invokeAction(user, actionWithMergedParams, payload,
waitForResponse, blockingLogs, cause = None)) {
case Success(Left(activationId)) =>
// non-blocking invoke or blocking invoke which got queued instead
respondWithActivationIdHeader(activationId) {
diff --git
a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
index 6f526575f2..7e6716e2ff 100644
--- a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala
@@ -629,7 +629,13 @@ trait WhiskWebActionsApi extends Directives with
ValidateRequestSize with PostAc
if (isRawHttpAction || context
.overrides(webApiDirectives.reservedProperties ++
action.immutableParameters)) {
val content = context.toActionArgument(onBehalfOf, isRawHttpAction)
- invokeAction(actionOwnerIdentity, action, Some(JsObject(content)),
maxWaitForWebActionResult, cause = None)
+ invokeAction(
+ actionOwnerIdentity,
+ action,
+ Some(JsObject(content)),
+ maxWaitForWebActionResult,
+ false,
+ cause = None)
} else {
Future.failed(RejectRequest(BadRequest, Messages.parametersNotAllowed))
}
diff --git
a/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala
b/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala
index 5548229ca8..eb8828cd5e 100644
---
a/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala
+++
b/core/controller/src/main/scala/whisk/core/controller/actions/PostActionActivation.scala
@@ -49,6 +49,7 @@ protected[core] trait PostActionActivation extends
PrimitiveActions with Sequenc
action: WhiskActionMetaData,
payload: Option[JsObject],
waitForResponse: Option[FiniteDuration],
+ responseWithLogs: Boolean,
cause: Option[ActivationId])(implicit transid: TransactionId):
Future[Either[ActivationId, WhiskActivation]] = {
action.toExecutableWhiskAction match {
// this is a topmost sequence
@@ -57,7 +58,7 @@ protected[core] trait PostActionActivation extends
PrimitiveActions with Sequenc
invokeSequence(user, action, components, payload, waitForResponse,
cause, topmost = true, 0).map(r => r._1)
// a non-deprecated ExecutableWhiskAction
case Some(executable) if !executable.exec.deprecated =>
- invokeSingleAction(user, executable, payload, waitForResponse, cause)
+ invokeSingleAction(user, executable, payload, waitForResponse,
responseWithLogs, cause)
// a deprecated exec
case _ =>
Future.failed(RejectRequest(BadRequest,
Messages.runtimeDeprecated(action.exec)))
diff --git
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
index d0d4f60c50..089481e502 100644
---
a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
+++
b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala
@@ -102,12 +102,13 @@ protected[actions] trait PrimitiveActions {
action: ExecutableWhiskActionMetaData,
payload: Option[JsObject],
waitForResponse: Option[FiniteDuration],
+ responseWithLogs: Boolean,
cause: Option[ActivationId])(implicit transid: TransactionId):
Future[Either[ActivationId, WhiskActivation]] = {
if (action.annotations.isTruthy(WhiskActivation.conductorAnnotation)) {
- invokeComposition(user, action, payload, waitForResponse, cause)
+ invokeComposition(user, action, payload, waitForResponse,
responseWithLogs, cause)
} else {
- invokeSimpleAction(user, action, payload, waitForResponse, cause)
+ invokeSimpleAction(user, action, payload, waitForResponse,
responseWithLogs, cause)
}
}
@@ -145,6 +146,7 @@ protected[actions] trait PrimitiveActions {
action: ExecutableWhiskActionMetaData,
payload: Option[JsObject],
waitForResponse: Option[FiniteDuration],
+ responseWithLogs: Boolean,
cause: Option[ActivationId])(implicit transid: TransactionId):
Future[Either[ActivationId, WhiskActivation]] = {
// merge package parameters with action (action parameters supersede),
then merge in payload
@@ -168,6 +170,7 @@ protected[actions] trait PrimitiveActions {
activationId, // activation id created here
activeAckTopicIndex,
waitForResponse.isDefined,
+ responseWithLogs,
args,
cause = cause,
WhiskTracerProvider.tracer.getTraceContext(transid))
@@ -261,6 +264,7 @@ protected[actions] trait PrimitiveActions {
action: ExecutableWhiskActionMetaData,
payload: Option[JsObject],
waitForResponse: Option[FiniteDuration],
+ responseWithLogs: Boolean,
cause: Option[ActivationId],
accounting: Option[CompositionAccounting] =
None)(
implicit transid: TransactionId): Future[Either[ActivationId,
WhiskActivation]] = {
@@ -325,6 +329,7 @@ protected[actions] trait PrimitiveActions {
action = session.action,
payload = params,
waitForResponse = Some(session.action.limits.timeout.duration +
1.minute), // wait for result
+ false,
cause = Some(session.activationId)) // cause is session id
waitForActivation(user, session, activationResponse).flatMap {
@@ -440,6 +445,7 @@ protected[actions] trait PrimitiveActions {
action,
payload,
waitForResponse = None, // not topmost, hence blocking, no need for
timeout
+ false,
cause = Some(session.activationId),
accounting = Some(session.accounting))
case Some(action) => // primitive action
@@ -449,6 +455,7 @@ protected[actions] trait PrimitiveActions {
action,
payload,
waitForResponse = Some(action.limits.timeout.duration + 1.minute),
+ false,
cause = Some(session.activationId))
case None => // sequence
session.accounting.components += 1
diff --git
a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
index d38a91ad1c..a54043cf93 100644
---
a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
+++
b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala
@@ -66,6 +66,7 @@ protected[actions] trait SequenceActions {
action: WhiskActionMetaData,
payload: Option[JsObject],
waitForResponse: Option[FiniteDuration],
+ responseWithLogs: Boolean,
cause: Option[ActivationId])(implicit transid: TransactionId):
Future[Either[ActivationId, WhiskActivation]]
/**
@@ -337,7 +338,7 @@ protected[actions] trait SequenceActions {
// this is an invoke for an atomic action
logging.debug(this, s"sequence invoking an enclosed atomic action
$action")
val timeout = action.limits.timeout.duration + 1.minute
- invokeAction(user, action, inputPayload, waitForResponse =
Some(timeout), cause) map {
+ invokeAction(user, action, inputPayload, waitForResponse =
Some(timeout), false, cause) map {
case res => (res, accounting.atomicActionCnt + 1)
}
}
diff --git
a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 18f5ac303b..9f8018ff3c 100644
---
a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++
b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -408,6 +408,7 @@ class InvokerActor(invokerInstance: InvokerInstanceId,
controllerInstance: Contr
activationId = new ActivationIdGenerator {}.make(),
rootControllerIndex = controllerInstance,
blocking = false,
+ blockingLogs = false,
content = None)
context.parent ! ActivationRequest(activationMessage, invokerInstance)
diff --git
a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
index 0ddd666d92..979984b587 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala
@@ -96,7 +96,7 @@ case object RescheduleJob // job is sent back to parent and
could not be process
*/
class ContainerProxy(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) =>
Future[Container],
- sendActiveAck: (TransactionId, WhiskActivation, Boolean,
ControllerInstanceId, UUID) => Future[Any],
+ sendActiveAck: (TransactionId, WhiskActivation, Boolean, Boolean,
ControllerInstanceId, UUID) => Future[Any],
storeActivation: (TransactionId, WhiskActivation, UserContext) =>
Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container,
ExecutableWhiskAction) => Future[ActivationLogs],
instance: InvokerInstanceId,
@@ -167,6 +167,7 @@ class ContainerProxy(
transid,
activation,
job.msg.blocking,
+ false,
job.msg.rootControllerIndex,
job.msg.user.namespace.uuid)
storeActivation(transid, activation, context)
@@ -390,8 +391,10 @@ class ContainerProxy(
}
// Sending active ack. Entirely asynchronous and not waited upon.
- activation.foreach(
- sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex,
job.msg.user.namespace.uuid))
+ if (!job.msg.blockingLogs) {
+ activation.foreach(
+ sendActiveAck(tid, _, job.msg.blocking, false,
job.msg.rootControllerIndex, job.msg.user.namespace.uuid))
+ }
val context = UserContext(job.msg.user)
@@ -418,8 +421,13 @@ class ContainerProxy(
}
}
- // Storing the record. Entirely asynchronous and not waited upon.
- activationWithLogs.map(_.fold(_.activation,
identity)).foreach(storeActivation(tid, _, context))
+ // Sending active ack and storing the record. Entirely asynchronous and
not waited upon.
+ activationWithLogs.map(_.fold(_.activation, identity)).foreach { act =>
+ if (job.msg.blockingLogs) {
+ sendActiveAck(tid, act, job.msg.blocking, true,
job.msg.rootControllerIndex, job.msg.user.namespace.uuid)
+ }
+ storeActivation(tid, act, context)
+ }
// Disambiguate activation errors and transform the Either into a
failed/successful Future respectively.
activationWithLogs.flatMap {
@@ -435,7 +443,7 @@ final case class ContainerProxyTimeoutConfig(idleContainer:
FiniteDuration, paus
object ContainerProxy {
def props(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) =>
Future[Container],
- ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID)
=> Future[Any],
+ ack: (TransactionId, WhiskActivation, Boolean, Boolean,
ControllerInstanceId, UUID) => Future[Any],
store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container,
ExecutableWhiskAction) => Future[ActivationLogs],
instance: InvokerInstanceId,
diff --git
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index 5f4fd8db48..3f3f065d98 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -117,6 +117,7 @@ class InvokerReactive(
private val ack = (tid: TransactionId,
activationResult: WhiskActivation,
blockingInvoke: Boolean,
+ responseWithLogs: Boolean,
controllerInstance: ControllerInstanceId,
userId: UUID) => {
implicit val transid: TransactionId = tid
@@ -130,6 +131,30 @@ class InvokerReactive(
s"posted ${if (recovery) "recovery" else "completion"} of
activation ${activationResult.activationId}")
}
}
+
+ def getTruncatedLogs(logs: ActivationLogs) = {
+ var totalLogSize = 0
+ val maxLogSize = 1024 * 4
+
+ ActivationLogs(
+ logs.logs.reverse
+ .map(log =>
+ if (totalLogSize < maxLogSize) {
+ if (log.size + totalLogSize <= maxLogSize) {
+ totalLogSize = totalLogSize + log.size
+ log
+ } else {
+ val l = s"...${log.substring(log.size - (maxLogSize -
totalLogSize))}"
+ totalLogSize = maxLogSize
+ l
+ }
+ } else {
+ ""
+ })
+ .filter(_.nonEmpty)
+ .reverse)
+ }
+
// Potentially sends activation metadata to kafka if user events are
enabled
UserEvents.send(
producer, {
@@ -155,7 +180,17 @@ class InvokerReactive(
activation.typeName)
})
- send(Right(if (blockingInvoke) activationResult else
activationResult.withoutLogsOrResult)).recoverWith {
+ val act = if (blockingInvoke) {
+ if (responseWithLogs) {
+ activationResult.withLogs(getTruncatedLogs(activationResult.logs))
+ } else {
+ activationResult.withoutLogs
+ }
+ } else {
+ activationResult.withoutLogsOrResult
+ }
+
+ send(Right(act)).recoverWith {
case t if t.getCause.isInstanceOf[RecordTooLargeException] =>
send(Left(activationResult.activationId), recovery = true)
}
@@ -241,7 +276,7 @@ class InvokerReactive(
val context = UserContext(msg.user)
val activation = generateFallbackActivation(msg, response)
activationFeed ! MessageFeed.Processed
- ack(msg.transid, activation, msg.blocking,
msg.rootControllerIndex, msg.user.namespace.uuid)
+ ack(msg.transid, activation, msg.blocking, false,
msg.rootControllerIndex, msg.user.namespace.uuid)
store(msg.transid, activation, context)
Future.successful(())
}
@@ -251,7 +286,7 @@ class InvokerReactive(
activationFeed ! MessageFeed.Processed
val activation =
generateFallbackActivation(msg,
ActivationResponse.applicationError(Messages.namespacesBlacklisted))
- ack(msg.transid, activation, false, msg.rootControllerIndex,
msg.user.namespace.uuid)
+ ack(msg.transid, activation, false, false, msg.rootControllerIndex,
msg.user.namespace.uuid)
logging.warn(this, s"namespace ${msg.user.namespace.name} was
blocked in invoker.")
Future.successful(())
}
diff --git
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
index 78317ad196..bc956dfb92 100644
---
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala
@@ -80,6 +80,7 @@ class ContainerPoolTests
ActivationId.generate(),
ControllerInstanceId("0"),
blocking = false,
+ blockingLogs = false,
content = None)
Run(action, message)
}
diff --git
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
index 4a2a1331fc..f43188aed7 100644
---
a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
+++
b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala
@@ -90,6 +90,7 @@ class ContainerProxyTests
ActivationId.generate(),
ControllerInstanceId("0"),
blocking = false,
+ blockingLogs = false,
content = None)
/*
@@ -141,7 +142,7 @@ class ContainerProxyTests
/** Creates an inspectable version of the ack method, which records all
calls in a buffer */
def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction {
- (_: TransactionId, activation: WhiskActivation, _: Boolean, _:
ControllerInstanceId, _: UUID) =>
+ (_: TransactionId, activation: WhiskActivation, _: Boolean, _: Boolean, _:
ControllerInstanceId, _: UUID) =>
activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
activation.annotations.get("path") shouldBe
Some(a.fullyQualifiedName(false).toString.toJson)
activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)
diff --git
a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
index a0acf185c7..5b18d0baa0 100644
--- a/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/WebActionsApiTests.scala
@@ -261,6 +261,7 @@ trait WebActionsApiBaseTests extends ControllerTestCommon
with BeforeAndAfterEac
action: WhiskActionMetaData,
payload: Option[JsObject],
waitForResponse: Option[FiniteDuration],
+ responseWithLogs: Boolean,
cause: Option[ActivationId])(implicit transid: TransactionId):
Future[Either[ActivationId, WhiskActivation]] = {
invocationCount = invocationCount + 1
diff --git
a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
index c8a8c0183f..31425fe2e7 100644
---
a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
+++
b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala
@@ -194,6 +194,7 @@ class InvokerSupervisionTests
activationId = new ActivationIdGenerator {}.make(),
rootControllerIndex = ControllerInstanceId("0"),
blocking = false,
+ blockingLogs = false,
content = None)
val msg = ActivationRequest(activationMessage, invokerInstance)
With regards,
Apache Git Services