This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 8be1505 add system config options for success / failure levels to
write blocking / non-blocking activations to db (#5169)
8be1505 is described below
commit 8be150577a12c54867d99f0dd82a74e9647863c0
Author: Brendan Doyle <[email protected]>
AuthorDate: Mon Jan 31 22:24:00 2022 -0800
add system config options for success / failure levels to write blocking /
non-blocking activations to db (#5169)
* add config option to disable successful non-blocking activation writes to
db
* fix check
* feedback
* fix tests
* rebase
* feedback
* update config precedence
* Apply suggestions from code review
Co-authored-by: rodric rabbah <[email protected]>
Co-authored-by: Brendan Doyle <[email protected]>
Co-authored-by: rodric rabbah <[email protected]>
---
common/scala/src/main/resources/application.conf | 9 ++
.../org/apache/openwhisk/core/WhiskConfig.scala | 3 +
.../openwhisk/core/database/ActivationStore.scala | 49 ++++++--
.../core/database/ActivationStoreLevel.scala | 29 +++++
.../core/database/ArtifactActivationStore.scala | 2 +-
.../ElasticSearchActivationStore.scala | 2 +-
.../core/database/memory/NoopActivationStore.scala | 11 +-
.../openwhisk/core/controller/Triggers.scala | 2 +-
.../core/controller/actions/PrimitiveActions.scala | 5 +-
.../core/controller/actions/SequenceActions.scala | 2 +-
.../core/loadBalancer/FPCPoolBalancer.scala | 3 +-
.../core/invoker/FPCInvokerReactive.scala | 2 +-
.../openwhisk/core/invoker/InvokerReactive.scala | 2 +-
.../test/ActionsApiWithDbPollingTests.scala | 7 +-
.../core/controller/test/ActivationsApiTests.scala | 125 +++++++++++++++++----
.../controller/test/ControllerTestCommon.scala | 22 ++--
16 files changed, 220 insertions(+), 55 deletions(-)
diff --git a/common/scala/src/main/resources/application.conf
b/common/scala/src/main/resources/application.conf
index ff5f8e2..6c69e11 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -438,10 +438,19 @@ whisk {
# uniqueName + displayName 253 (max pod name length in
Kube)
serdes-overhead = 6068 // 3034 bytes of metadata * 2 for extra headroom
+ # DEPRECATED, use store-blocking-result-level
# Disables database store for blocking + successful activations
# invocations made with `X-OW-EXTRA-LOGGING: on` header, will force
the activation to be stored
disable-store-result = false
+ # Result level to store in db for blocking activations (STORE_ALWAYS,
STORE_FAILURES, STORE_FAILURES_NOT_APPLICATION_ERRORS)
+ # invocations made with `X-OW-EXTRA-LOGGING: on` header, will force
the activation to be stored
+ store-blocking-result-level = "STORE_ALWAYS"
+
+ # Result level to store in db for non-blocking activations
(STORE_ALWAYS, STORE_FAILURES, STORE_FAILURES_NOT_APPLICATION_ERRORS)
+ # invocations made with `X-OW-EXTRA-LOGGING: on` header, will force
the activation to be stored
+ store-non-blocking-result-level = "STORE_ALWAYS"
+
# Enable metadata logging of activations not stored in the database
unstored-logs-enabled = false
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index e50fbb4..2a6490b 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -286,7 +286,10 @@ object ConfigKeys {
val sharedPackageExecuteOnly = s"whisk.shared-packages-execute-only"
val swaggerUi = "whisk.swagger-ui"
+ /* DEPRECATED: disableStoreResult is deprecated for storeBlockingResultLevel
*/
val disableStoreResult = s"$activation.disable-store-result"
+ val storeBlockingResultLevel = s"$activation.store-blocking-result-level"
+ val storeNonBlockingResultLevel =
s"$activation.store-non-blocking-result-level"
val unstoredLogsEnabled = s"$activation.unstored-logs-enabled"
val apacheClientConfig = "whisk.apache-client"
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
index 8365150..9981bff 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStore.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.database
import java.time.Instant
-
import akka.actor.ActorSystem
import akka.http.scaladsl.model.HttpRequest
import spray.json.JsObject
@@ -33,8 +32,24 @@ import scala.concurrent.Future
case class UserContext(user: Identity, request: HttpRequest = HttpRequest())
trait ActivationStore {
+ val logging: Logging
+ /* DEPRECATED: disableStoreResult config is now deprecated replaced with
blocking activation store level (storeBlockingResultLevel) */
protected val disableStoreResultConfig =
loadConfigOrThrow[Boolean](ConfigKeys.disableStoreResult)
+ protected val storeBlockingResultLevelConfig = {
+ try {
+
ActivationStoreLevel.valueOf(loadConfigOrThrow[String](ConfigKeys.storeBlockingResultLevel))
+ } catch {
+ case _: Exception =>
+ val disableStoreResultConfig =
loadConfigOrThrow[Boolean](ConfigKeys.disableStoreResult)
+ logging.warn(
+ this,
+ s"The config ${ConfigKeys.disableStoreResult} being used is
deprecated. Please use the replacement config
${ConfigKeys.storeBlockingResultLevel}")
+ if (disableStoreResultConfig) ActivationStoreLevel.STORE_FAILURES else
ActivationStoreLevel.STORE_ALWAYS
+ }
+ }
+ protected val storeNonBlockingResultLevelConfig =
+
ActivationStoreLevel.valueOf(loadConfigOrThrow[String](ConfigKeys.storeNonBlockingResultLevel))
protected val unstoredLogsEnabledConfig =
loadConfigOrThrow[Boolean](ConfigKeys.unstoredLogsEnabled)
/**
@@ -42,6 +57,8 @@ trait ActivationStore {
*
* @param activation activation to store
* @param isBlockingActivation is activation blocking
+ * @param blockingStoreLevel do not store activation if successful and
blocking
+ * @param nonBlockingStoreLevel do not store activation if successful and
non-blocking
* @param context user and request context
* @param transid transaction ID for request
* @param notifier cache change notifier
@@ -49,16 +66,18 @@ trait ActivationStore {
*/
def storeAfterCheck(activation: WhiskActivation,
isBlockingActivation: Boolean,
- disableStore: Option[Boolean],
+ blockingStoreLevel: Option[ActivationStoreLevel.Value],
+ nonBlockingStoreLevel:
Option[ActivationStoreLevel.Value],
context: UserContext)(implicit transid: TransactionId,
notifier:
Option[CacheChangeNotification],
logging: Logging): Future[DocInfo]
= {
if (context.user.limits.storeActivations.getOrElse(true) &&
shouldStoreActivation(
- activation.response.isSuccess,
+ activation.response,
isBlockingActivation,
transid.meta.extraLogging,
- disableStore.getOrElse(disableStoreResultConfig))) {
+ blockingStoreLevel.getOrElse(storeBlockingResultLevelConfig),
+ nonBlockingStoreLevel.getOrElse(storeNonBlockingResultLevelConfig)))
{
store(activation, context)
} else {
@@ -183,17 +202,29 @@ trait ActivationStore {
* - an activation in debug mode
* - activation stores is not disabled via a configuration parameter
*
- * @param isSuccess is successful activation
+ * @param activationResponse to check
* @param isBlocking is blocking activation
* @param debugMode is logging header set to "on" for the invocation
- * @param disableStore is disable store configured
+ * @param blockingStoreLevel level of activation status to store for
blocking invocations
+ * @param nonBlockingStoreLevel level of activation status to store for
blocking invocations
* @return Should the activation be stored to the database
*/
- private def shouldStoreActivation(isSuccess: Boolean,
+ private def shouldStoreActivation(activationResponse: ActivationResponse,
isBlocking: Boolean,
debugMode: Boolean,
- disableStore: Boolean): Boolean = {
- !isSuccess || !isBlocking || debugMode || !disableStore
+ blockingStoreLevel:
ActivationStoreLevel.Value,
+ nonBlockingStoreLevel:
ActivationStoreLevel.Value): Boolean = {
+ def shouldStoreOnLevel(storageLevel: ActivationStoreLevel.Value): Boolean
= {
+ storageLevel match {
+ case ActivationStoreLevel.STORE_ALWAYS => true
+ case ActivationStoreLevel.STORE_FAILURES =>
!activationResponse.isSuccess
+ case ActivationStoreLevel.STORE_FAILURES_NOT_APPLICATION_ERRORS =>
+ activationResponse.isContainerError ||
activationResponse.isWhiskError
+ }
+ }
+
+ debugMode || (isBlocking && shouldStoreOnLevel(blockingStoreLevel)) ||
(!isBlocking && shouldStoreOnLevel(
+ nonBlockingStoreLevel))
}
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStoreLevel.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStoreLevel.scala
new file mode 100644
index 0000000..7f37906
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ActivationStoreLevel.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database
+
+object ActivationStoreLevel extends Enumeration {
+ type ActivationStoreLevel = Value
+ val STORE_ALWAYS, STORE_FAILURES, STORE_FAILURES_NOT_APPLICATION_ERRORS =
Value
+
+ def valueOf(value: String): Value = {
+ values
+ .find(_.toString == value.toUpperCase())
+ .getOrElse(throw new IllegalArgumentException(s"Invalid log level:
$value"))
+ }
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactActivationStore.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactActivationStore.scala
index 1e2ab88..9b62f72 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactActivationStore.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactActivationStore.scala
@@ -27,7 +27,7 @@ import org.apache.openwhisk.core.entity._
import scala.concurrent.Future
import scala.util.{Failure, Success}
-class ArtifactActivationStore(actorSystem: ActorSystem, logging: Logging)
extends ActivationStore {
+class ArtifactActivationStore(actorSystem: ActorSystem, override val logging:
Logging) extends ActivationStore {
implicit val executionContext = actorSystem.dispatcher
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
index 39f5a0c..5050426 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/elasticsearch/ElasticSearchActivationStore.scala
@@ -57,7 +57,7 @@ case class ElasticSearchActivationStoreConfig(protocol:
String,
class ElasticSearchActivationStore(
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]),
(Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
elasticSearchConfig: ElasticSearchActivationStoreConfig,
- useBatching: Boolean = false)(implicit actorSystem: ActorSystem, logging:
Logging)
+ useBatching: Boolean = false)(implicit actorSystem: ActorSystem, override
val logging: Logging)
extends ActivationStore {
import com.sksamuel.elastic4s.http.ElasticDsl._
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala
index 7b50685..8d45ff0 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/memory/NoopActivationStore.scala
@@ -18,21 +18,16 @@
package org.apache.openwhisk.core.database.memory
import java.time.Instant
-
import akka.actor.ActorSystem
-import org.apache.openwhisk.common.{Logging, TransactionId, WhiskInstants}
-import org.apache.openwhisk.core.database.{
- ActivationStore,
- ActivationStoreProvider,
- CacheChangeNotification,
- UserContext
-}
+import org.apache.openwhisk.common.{Logging, PrintStreamLogging,
TransactionId, WhiskInstants}
+import org.apache.openwhisk.core.database.{ActivationStore,
ActivationStoreProvider, CacheChangeNotification, UserContext}
import org.apache.openwhisk.core.entity.{ActivationId, DocInfo, EntityName,
EntityPath, Subject, WhiskActivation}
import spray.json.{JsNumber, JsObject}
import scala.concurrent.Future
object NoopActivationStore extends ActivationStore with WhiskInstants {
+ override val logging = new PrintStreamLogging()
private val emptyInfo = DocInfo("foo")
private val emptyCount = JsObject("activations" -> JsNumber(0))
private val dummyActivation = WhiskActivation(
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala
index c70caee..5751099 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Triggers.scala
@@ -161,7 +161,7 @@ trait WhiskTriggersApi extends WhiskCollectionAPI {
triggerActivation
}
.map { activation =>
- activationStore.storeAfterCheck(activation, false, None,
context)
+ activationStore.storeAfterCheck(activation, false, None,
None, context)
}
respondWithActivationIdHeader(triggerActivationId) {
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
index 621a10e..19ca12e 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala
@@ -600,7 +600,10 @@ protected[actions] trait PrimitiveActions {
}
}
- activationStore.storeAfterCheck(activation, blockingComposition, None,
context)(transid, notifier = None, logging)
+ activationStore.storeAfterCheck(activation, blockingComposition, None,
None, context)(
+ transid,
+ notifier = None,
+ logging)
activation
}
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
index 619ccdb..0bc082b 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala
@@ -181,7 +181,7 @@ protected[actions] trait SequenceActions {
}
}
- activationStore.storeAfterCheck(seqActivation, blockingSequence,
None, context)(
+ activationStore.storeAfterCheck(seqActivation, blockingSequence,
None, None, context)(
transid,
notifier = None,
logging)
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
index 37c9ffb..2280ce7 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala
@@ -366,7 +366,8 @@ class FPCPoolBalancer(config: WhiskConfig,
// and complete the promise with a failure if necessary
activationPromises
.remove(aid)
- .foreach(_.tryFailure(new Throwable("Activation entry has timed
out, no completion or active ack received yet")))
+ .foreach(
+ _.tryFailure(new Throwable("Activation entry has timed out, no
completion or active ack received yet")))
}
// Active acks that are received here are strictly from user actions -
health actions are not part of
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
index 8a0fdc4..8158fd3 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/FPCInvokerReactive.scala
@@ -178,7 +178,7 @@ class FPCInvokerReactive(config: WhiskConfig,
/** Stores an activation in the database. */
private val store = (tid: TransactionId, activation: WhiskActivation,
isBlocking: Boolean, context: UserContext) => {
implicit val transid: TransactionId = tid
- activationStore.storeAfterCheck(activation, isBlocking, None,
context)(tid, notifier = None, logging)
+ activationStore.storeAfterCheck(activation, isBlocking, None, None,
context)(tid, notifier = None, logging)
}
private def healthActivationClientFactory(f: ActorRefFactory,
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index f2d36b1..e32ece2 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -143,7 +143,7 @@ class InvokerReactive(
/** Stores an activation in the database. */
private val store = (tid: TransactionId, activation: WhiskActivation,
isBlocking: Boolean, context: UserContext) => {
implicit val transid: TransactionId = tid
- activationStore.storeAfterCheck(activation, isBlocking, None,
context)(tid, notifier = None, logging)
+ activationStore.storeAfterCheck(activation, isBlocking, None, None,
context)(tid, notifier = None, logging)
}
/** Creates a ContainerProxy Actor when being called. */
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiWithDbPollingTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiWithDbPollingTests.scala
index 962b5da..cc391a1 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiWithDbPollingTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiWithDbPollingTests.scala
@@ -18,14 +18,13 @@
package org.apache.openwhisk.core.controller.test
import java.time.Instant
-
import
akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonUnmarshaller
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.server.Route
import org.apache.openwhisk.core.controller.WhiskActionsApi
import org.apache.openwhisk.core.controller.actions.ControllerActivationConfig
-import org.apache.openwhisk.core.database.UserContext
+import org.apache.openwhisk.core.database.{ActivationStoreLevel, UserContext}
import org.apache.openwhisk.core.entity._
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
@@ -79,7 +78,7 @@ class ActionsApiWithDbPollingTests extends
ControllerTestCommon with WhiskAction
// storing the activation in the db will allow the db polling to retrieve
it
// the test harness makes sure the activation id observed by the test
matches
// the one generated by the api handler
- storeActivation(activation, false, false, context)
+ storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS,
ActivationStoreLevel.STORE_ALWAYS, context)
try {
Post(s"$collectionPath/${action.name}?blocking=true") ~>
Route.seal(routes(creds)) ~> check {
status should be(OK)
@@ -114,7 +113,7 @@ class ActionsApiWithDbPollingTests extends
ControllerTestCommon with WhiskAction
// storing the activation in the db will allow the db polling to retrieve
it
// the test harness makes sure the activation id observed by the test
matches
// the one generated by the api handler
- storeActivation(activation, false, false, context)
+ storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS,
ActivationStoreLevel.STORE_ALWAYS, context)
try {
Post(s"$collectionPath/${action.name}?blocking=true") ~>
Route.seal(routes(creds)) ~> check {
status should be(InternalServerError)
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala
index a243397..5e266dd 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActivationsApiTests.scala
@@ -18,7 +18,6 @@
package org.apache.openwhisk.core.controller.test
import java.time.{Clock, Instant}
-
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.server.Route
@@ -31,7 +30,7 @@ import org.apache.openwhisk.core.entitlement.Collection
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.http.{ErrorResponse, Messages}
-import org.apache.openwhisk.core.database.UserContext
+import org.apache.openwhisk.core.database.{ActivationStoreLevel, UserContext}
/**
* Tests Activations API.
@@ -95,7 +94,8 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
end = Instant.now)
}.toList
try {
- (notExpectedActivations ++ activations).foreach(storeActivation(_,
false, false, context))
+ (notExpectedActivations ++ activations).foreach(
+ storeActivation(_, false, ActivationStoreLevel.STORE_ALWAYS,
ActivationStoreLevel.STORE_ALWAYS, context))
waitOnListActivationsInNamespace(namespace, 2, context)
org.apache.openwhisk.utils.retry {
@@ -179,7 +179,8 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
}.toList
try {
- (notExpectedActivations ++ activations).foreach(storeActivation(_,
false, false, context))
+ (notExpectedActivations ++ activations).foreach(
+ storeActivation(_, false, ActivationStoreLevel.STORE_ALWAYS,
ActivationStoreLevel.STORE_ALWAYS, context))
waitOnListActivationsInNamespace(namespace, 2, context)
checkCount("", 2)
@@ -254,7 +255,8 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
end = now.plusSeconds(30))) // should match
try {
- (notExpectedActivations ++ activations).foreach(storeActivation(_,
false, false, context))
+ (notExpectedActivations ++ activations).foreach(
+ storeActivation(_, false, ActivationStoreLevel.STORE_ALWAYS,
ActivationStoreLevel.STORE_ALWAYS, context))
waitOnListActivationsInNamespace(namespace, activations.length, context)
{ // get between two time stamps
@@ -363,7 +365,8 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
annotations = Parameters("path", s"${namespace.asString}/pkg/xyz"))
}.toList
try {
- (notExpectedActivations ++ activations ++
activationsInPackage).foreach(storeActivation(_, false, false, context))
+ (notExpectedActivations ++ activations ++ activationsInPackage).foreach(
+ storeActivation(_, false, ActivationStoreLevel.STORE_ALWAYS,
ActivationStoreLevel.STORE_ALWAYS, context))
waitOnListActivationsMatchingName(namespace, EntityPath("xyz"),
activations.length, context)
waitOnListActivationsMatchingName(
namespace,
@@ -479,7 +482,8 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
}.toList
try {
- activations.foreach(storeActivation(_, false, false, context))
+ activations.foreach(
+ storeActivation(_, false, ActivationStoreLevel.STORE_ALWAYS,
ActivationStoreLevel.STORE_ALWAYS, context))
waitOnListActivationsInNamespace(namespace, activations.size, context)
Get(s"$collectionPath?skip=1") ~> Route.seal(routes(creds)) ~> check {
@@ -503,7 +507,8 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
}.toList
try {
- activations.foreach(storeActivation(_, false, false, context))
+ activations.foreach(
+ storeActivation(_, false, ActivationStoreLevel.STORE_ALWAYS,
ActivationStoreLevel.STORE_ALWAYS, context))
waitOnListActivationsInNamespace(namespace, activations.size, context)
Get(s"$collectionPath?limit=1") ~> Route.seal(routes(creds)) ~> check {
@@ -533,7 +538,7 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
start = Instant.now,
end = Instant.now)
try {
- storeActivation(activation, false, false, context)
+ storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS,
ActivationStoreLevel.STORE_ALWAYS, context)
Get(s"$collectionPath/${activation.activationId.asString}") ~>
Route.seal(routes(creds)) ~> check {
status should be(OK)
@@ -570,7 +575,7 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
start = Instant.now,
end = Instant.now)
try {
- storeActivation(activation, false, false, context)
+ storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS,
ActivationStoreLevel.STORE_ALWAYS, context)
Get(s"$collectionPath/${activation.activationId.asString}/result") ~>
Route.seal(routes(creds)) ~> check {
status should be(OK)
@@ -583,7 +588,26 @@ class ActivationsApiTests extends ControllerTestCommon
with WhiskActivationsApi
}
//// GET /activations/id/result when db store is disabled
- it should "return activation empty when db store is disabled" in {
+ it should "return activation empty when db store is set to failures for
successful blocking" in {
+ implicit val tid = transid()
+ val activation =
+ WhiskActivation(
+ namespace,
+ aname(),
+ creds.subject,
+ ActivationId.generate(),
+ start = Instant.now,
+ end = Instant.now)
+
+ storeActivation(activation, true, ActivationStoreLevel.STORE_FAILURES,
ActivationStoreLevel.STORE_ALWAYS, context)
+
+ Get(s"$collectionPath/${activation.activationId.asString}/result") ~>
Route.seal(routes(creds)) ~> check {
+ status should be(NotFound)
+ }
+ }
+
+ //// GET /activations/id/result when db store is disabled
+ it should "return activation empty when db store is set to failures for
non-blocking" in {
implicit val tid = transid()
val activation =
WhiskActivation(
@@ -594,7 +618,7 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
start = Instant.now,
end = Instant.now)
- storeActivation(activation, true, true, context)
+ storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS,
ActivationStoreLevel.STORE_FAILURES, context)
Get(s"$collectionPath/${activation.activationId.asString}/result") ~>
Route.seal(routes(creds)) ~> check {
status should be(NotFound)
@@ -602,7 +626,7 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
}
//// GET /activations/id/result when store is disabled and activation is not
blocking
- it should "get activation result by id when db store is disabled and
activation is not blocking" in {
+ it should "get activation result by id when db store is disabled for
successful blocking and activation is not blocking" in {
implicit val tid = transid()
val activation =
WhiskActivation(
@@ -613,7 +637,12 @@ class ActivationsApiTests extends ControllerTestCommon
with WhiskActivationsApi
start = Instant.now,
end = Instant.now)
try {
- storeActivation(activation, false, true, context)
+ storeActivation(
+ activation,
+ false,
+ ActivationStoreLevel.STORE_FAILURES,
+ ActivationStoreLevel.STORE_ALWAYS,
+ context)
Get(s"$collectionPath/${activation.activationId.asString}/result") ~>
Route.seal(routes(creds)) ~> check {
status should be(OK)
@@ -626,7 +655,7 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
}
//// GET /activations/id/result when store is disabled and activation is
unsuccessful
- it should "get activation result by id when db store is disabled and
activation is unsuccessful" in {
+ it should "get activation result by id when db store is set to failures and
activation is unsuccessful" in {
implicit val tid = transid()
val activation =
WhiskActivation(
@@ -638,7 +667,65 @@ class ActivationsApiTests extends ControllerTestCommon
with WhiskActivationsApi
end = Instant.now,
response = ActivationResponse.whiskError("activation error"))
try {
- storeActivation(activation, true, true, context)
+ storeActivation(
+ activation,
+ true,
+ ActivationStoreLevel.STORE_FAILURES,
+ ActivationStoreLevel.STORE_FAILURES,
+ context)
+
+ Get(s"$collectionPath/${activation.activationId.asString}/result") ~>
Route.seal(routes(creds)) ~> check {
+ status should be(OK)
+ val response = responseAs[JsObject]
+ response should be(activation.response.toExtendedJson)
+ }
+ } finally {
+ deleteActivation(ActivationId(activation.docid.asString), context)
+ }
+ }
+
+ it should "return activation empty when db store is set to not application
failures and activation is application failure" in {
+ implicit val tid = transid()
+ val activation =
+ WhiskActivation(
+ namespace,
+ aname(),
+ creds.subject,
+ ActivationId.generate(),
+ start = Instant.now,
+ end = Instant.now,
+ response = ActivationResponse.applicationError("activation error"))
+
+ storeActivation(
+ activation,
+ true,
+ ActivationStoreLevel.STORE_FAILURES_NOT_APPLICATION_ERRORS,
+ ActivationStoreLevel.STORE_FAILURES_NOT_APPLICATION_ERRORS,
+ context)
+
+ Get(s"$collectionPath/${activation.activationId.asString}/result") ~>
Route.seal(routes(creds)) ~> check {
+ status should be(NotFound)
+ }
+ }
+
+ it should "get activation result by id when db store is set to not
application failures failures and activation is unsuccessful" in {
+ implicit val tid = transid()
+ val activation =
+ WhiskActivation(
+ namespace,
+ aname(),
+ creds.subject,
+ ActivationId.generate(),
+ start = Instant.now,
+ end = Instant.now,
+ response = ActivationResponse.whiskError("activation error"))
+ try {
+ storeActivation(
+ activation,
+ true,
+ ActivationStoreLevel.STORE_FAILURES_NOT_APPLICATION_ERRORS,
+ ActivationStoreLevel.STORE_FAILURES_NOT_APPLICATION_ERRORS,
+ context)
Get(s"$collectionPath/${activation.activationId.asString}/result") ~>
Route.seal(routes(creds)) ~> check {
status should be(OK)
@@ -662,7 +749,7 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
start = Instant.now,
end = Instant.now)
try {
- storeActivation(activation, false, false, context)
+ storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS,
ActivationStoreLevel.STORE_ALWAYS, context)
Get(s"$collectionPath/${activation.activationId.asString}/logs") ~>
Route.seal(routes(creds)) ~> check {
status should be(OK)
@@ -685,7 +772,7 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
ActivationId.generate(),
start = Instant.now,
end = Instant.now)
- storeActivation(activation, false, false, context)
+ storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS,
ActivationStoreLevel.STORE_ALWAYS, context)
try {
Get(s"$collectionPath/${activation.activationId.asString}/bogus") ~>
Route.seal(routes(creds)) ~> check {
@@ -758,7 +845,7 @@ class ActivationsApiTests extends ControllerTestCommon with
WhiskActivationsApi
val activation =
new BadActivation(namespace, aname(), creds.subject,
ActivationId.generate(), Instant.now, Instant.now)
- storeActivation(activation, false, false, context)
+ storeActivation(activation, false, ActivationStoreLevel.STORE_ALWAYS,
ActivationStoreLevel.STORE_ALWAYS, context)
Get(s"$collectionPath/${activation.activationId}") ~>
Route.seal(routes(creds)) ~> check {
status should be(InternalServerError)
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
index 95c527c..13f9572 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala
@@ -34,14 +34,19 @@ import org.apache.openwhisk.core.{FeatureFlags, WhiskConfig}
import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.controller.{CustomHeaders, RestApiCommons,
WhiskServices}
-import org.apache.openwhisk.core.database.{ActivationStoreProvider,
CacheChangeNotification, DocumentFactory}
+import org.apache.openwhisk.core.database.{
+ ActivationStoreLevel,
+ ActivationStoreProvider,
+ CacheChangeNotification,
+ DocumentFactory,
+ UserContext
+}
import org.apache.openwhisk.core.database.test.DbUtils
import org.apache.openwhisk.core.entitlement._
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.test.ExecHelpers
import org.apache.openwhisk.core.loadBalancer.LoadBalancer
import org.apache.openwhisk.spi.SpiLoader
-import org.apache.openwhisk.core.database.UserContext
protected trait ControllerTestCommon
extends FlatSpec
@@ -127,12 +132,15 @@ protected trait ControllerTestCommon
def storeActivation(
activation: WhiskActivation,
isBlockingActivation: Boolean,
- disableStore: Boolean,
+ blockingStoreLevel: ActivationStoreLevel.Value,
+ nonBlockingStoreLevel: ActivationStoreLevel.Value,
context: UserContext)(implicit transid: TransactionId, timeout: Duration =
10 seconds): DocInfo = {
- val docFuture = activationStore.storeAfterCheck(activation,
isBlockingActivation, Some(disableStore), context)(
- transid,
- notifier = None,
- logging)
+ val docFuture = activationStore.storeAfterCheck(
+ activation,
+ isBlockingActivation,
+ Some(blockingStoreLevel),
+ Some(nonBlockingStoreLevel),
+ context)(transid, notifier = None, logging)
val doc = Await.result(docFuture, timeout)
assert(doc != null)
doc