This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 1bd21bc Ensure cache gets properly updated with concurrent access for
action with attachments (#4183)
1bd21bc is described below
commit 1bd21bccc7ad90b0081add8e1a9cd736dc60ad47
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Tue Dec 18 00:46:23 2018 +0530
Ensure cache gets properly updated with concurrent access for action with
attachments (#4183)
---
.../openwhisk/core/database/DocumentFactory.scala | 155 +++++++--------------
.../apache/openwhisk/core/entity/WhiskAction.scala | 5 +-
.../core/controller/test/ActionsApiTests.scala | 57 +++++++-
3 files changed, 109 insertions(+), 108 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala
index b32ec9a..75f2d70 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala
@@ -21,9 +21,6 @@ import java.io.InputStream
import java.io.OutputStream
import scala.concurrent.{Future, Promise}
-import scala.util.Failure
-import scala.util.Success
-import scala.util.Try
import akka.http.scaladsl.model.ContentType
import akka.stream.IOResult
import akka.stream.scaladsl.StreamConverters
@@ -102,24 +99,12 @@ trait DocumentFactory[W <: DocumentRevisionProvider]
extends MultipleReadersSing
def put[Wsuper >: W](db: ArtifactStore[Wsuper], doc: W, old: Option[W])(
implicit transid: TransactionId,
notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
- Try {
- require(db != null, "db undefined")
- require(doc != null, "doc undefined")
- } map { _ =>
- implicit val logger = db.logging
- implicit val ec = db.executionContext
-
- val key = CacheKey(doc)
- val docInfo = doc.docinfo
-
- cacheUpdate(doc, key, db.put(doc) map { newDocInfo =>
- doc.revision[W](newDocInfo.rev)
- doc.docinfo
- })
- } match {
- case Success(f) => f
- case Failure(t) => Future.failed(t)
- }
+ implicit val logger = db.logging
+ implicit val ec = db.executionContext
+ cacheUpdate(doc, CacheKey(doc), db.put(doc) map { newDocInfo =>
+ doc.revision[W](newDocInfo.rev)
+ doc.docinfo
+ })
}
def putAndAttach[Wsuper >: W](db: ArtifactStore[Wsuper],
@@ -131,49 +116,31 @@ trait DocumentFactory[W <: DocumentRevisionProvider]
extends MultipleReadersSing
postProcess: Option[W => W] = None)(
implicit transid: TransactionId,
notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
+ implicit val logger = db.logging
+ implicit val ec = db.executionContext
- Try {
- require(db != null, "db undefined")
- require(doc != null, "doc undefined")
- } map { _ =>
- implicit val logger = db.logging
- implicit val ec = db.executionContext
-
- val key = CacheKey(doc)
- val src = StreamConverters.fromInputStream(() => bytes)
-
- val p = Promise[W]
- cacheUpdate(p.future, key, db.putAndAttach[W](doc, update, contentType,
src, oldAttachment) map {
- case (newDocInfo, attached) =>
- val newDoc = update(doc, attached)
- val cacheDoc = postProcess map { _(newDoc) } getOrElse newDoc
- cacheDoc.revision[W](newDocInfo.rev)
- p.success(cacheDoc)
- newDocInfo
- })
-
- } match {
- case Success(f) => f
- case Failure(t) => Future.failed(t)
- }
+ val key = CacheKey(doc)
+ val src = StreamConverters.fromInputStream(() => bytes)
+
+ val p = Promise[W]
+ cacheUpdate(p.future, key, db.putAndAttach[W](doc, update, contentType,
src, oldAttachment) map {
+ case (newDocInfo, attached) =>
+ val newDoc = update(doc, attached)
+ val cacheDoc = postProcess map { _(newDoc) } getOrElse newDoc
+ cacheDoc.revision[W](newDocInfo.rev)
+ p.success(cacheDoc)
+ newDocInfo
+ })
}
def del[Wsuper >: W](db: ArtifactStore[Wsuper], doc: DocInfo)(
implicit transid: TransactionId,
notifier: Option[CacheChangeNotification]): Future[Boolean] = {
- Try {
- require(db != null, "db undefined")
- require(doc != null, "doc undefined")
- } map { _ =>
- implicit val logger = db.logging
- implicit val ec = db.executionContext
+ implicit val logger = db.logging
+ implicit val ec = db.executionContext
- val key = CacheKey(doc.id.asDocInfo)
- cacheInvalidate(key, db.del(doc))
- } match {
- case Success(f) => f
- case Failure(t) => Future.failed(t)
- }
+ val key = CacheKey(doc.id.asDocInfo)
+ cacheInvalidate(key, db.del(doc))
}
/**
@@ -198,77 +165,57 @@ trait DocumentFactory[W <: DocumentRevisionProvider]
extends MultipleReadersSing
doc: DocId,
rev: DocRevision = DocRevision.empty,
fromCache: Boolean = cacheEnabled)(implicit transid: TransactionId, mw:
Manifest[W]): Future[W] = {
- getWithAttachment(db, doc, rev, fromCache, None)
+ implicit val logger = db.logging
+ implicit val ec = db.executionContext
+ val key = doc.asDocInfo(rev)
+ cacheLookup(CacheKey(key), db.get[W](key, None), fromCache)
}
+ /**
+ * Fetches document along with attachment. `postProcess` would be used to
process the fetched document
+ * before adding it to cache. This ensures that for documents having
attachment the cache is updated only
+ * post fetch of the attachment
+ */
protected def getWithAttachment[Wsuper >: W](
db: ArtifactStore[Wsuper],
doc: DocId,
rev: DocRevision = DocRevision.empty,
fromCache: Boolean,
- attachmentHandler: Option[(W, Attached) => W])(implicit transid:
TransactionId, mw: Manifest[W]): Future[W] = {
- Try {
- require(db != null, "db undefined")
- } map {
- implicit val logger = db.logging
- implicit val ec = db.executionContext
- val key = doc.asDocInfo(rev)
- _ =>
- cacheLookup(CacheKey(key), db.get[W](key, attachmentHandler),
fromCache)
- } match {
- case Success(f) => f
- case Failure(t) => Future.failed(t)
- }
+ attachmentHandler: (W, Attached) => W,
+ postProcess: W => Future[W])(implicit transid: TransactionId, mw:
Manifest[W]): Future[W] = {
+ implicit val logger = db.logging
+ implicit val ec = db.executionContext
+ val key = doc.asDocInfo(rev)
+ cacheLookup(CacheKey(key), db.get[W](key,
Some(attachmentHandler)).flatMap(postProcess), fromCache)
}
- def getAttachment[Wsuper >: W](
+ protected def getAttachment[Wsuper >: W](
db: ArtifactStore[Wsuper],
doc: W,
attached: Attached,
outputStream: OutputStream,
postProcess: Option[W => W] = None)(implicit transid: TransactionId, mw:
Manifest[W]): Future[W] = {
-
implicit val ec = db.executionContext
implicit val notifier: Option[CacheChangeNotification] = None
+ implicit val logger = db.logging
- Try {
- require(db != null, "db defined")
- require(doc != null, "doc undefined")
- } map { _ =>
- implicit val logger = db.logging
- implicit val ec = db.executionContext
-
- val docInfo = doc.docinfo
- val key = CacheKey(docInfo)
- val sink = StreamConverters.fromOutputStream(() => outputStream)
+ val docInfo = doc.docinfo
+ val key = CacheKey(docInfo)
+ val sink = StreamConverters.fromOutputStream(() => outputStream)
- db.readAttachment[IOResult](docInfo, attached, sink).map {
- case _ =>
- val cacheDoc = postProcess map { _(doc) } getOrElse doc
+ db.readAttachment[IOResult](docInfo, attached, sink).map { _ =>
+ val cacheDoc = postProcess.map(_(doc)).getOrElse(doc)
- cacheUpdate(cacheDoc, key, Future.successful(docInfo)) map {
newDocInfo =>
- cacheDoc.revision[W](newDocInfo.rev)
- }
- cacheDoc
+ cacheUpdate(cacheDoc, key, Future.successful(docInfo)) map { newDocInfo
=>
+ cacheDoc.revision[W](newDocInfo.rev)
}
-
- } match {
- case Success(f) => f
- case Failure(t) => Future.failed(t)
+ cacheDoc
}
}
def deleteAttachments[Wsuper >: W](db: ArtifactStore[Wsuper], doc: DocInfo)(
implicit transid: TransactionId): Future[Boolean] = {
- Try {
- require(db != null, "db defined")
- require(doc != null, "doc undefined")
- } map { _ =>
- implicit val ec = db.executionContext
- db.deleteAttachments(doc)
- } match {
- case Success(f) => f
- case Failure(t) => Future.failed(t)
- }
+ implicit val ec = db.executionContext
+ db.deleteAttachments(doc)
}
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
index 65ddcf3..38d1a2f 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
@@ -376,9 +376,7 @@ object WhiskAction extends DocumentFactory[WhiskAction]
with WhiskEntityQueries[
implicit val ec = db.executionContext
- val fa = super.getWithAttachment(db, doc, rev, fromCache,
Some(attachmentHandler _))
-
- fa.flatMap { action =>
+ val inlineActionCode: WhiskAction => Future[WhiskAction] = { action =>
def getWithAttachment(attached: Attached, binary: Boolean, exec:
AttachedCode) = {
val boas = new ByteArrayOutputStream()
val wrapped = if (binary) Base64.getEncoder().wrap(boas) else boas
@@ -400,6 +398,7 @@ object WhiskAction extends DocumentFactory[WhiskAction]
with WhiskEntityQueries[
Future.successful(action)
}
}
+ super.getWithAttachment(db, doc, rev, fromCache, attachmentHandler,
inlineActionCode)
}
def attachmentHandler(action: WhiskAction, attached: Attached): WhiskAction
= {
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
index d2d6387..ce8a1bf 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ActionsApiTests.scala
@@ -36,8 +36,8 @@ import org.apache.openwhisk.core.entitlement.Collection
import org.apache.openwhisk.http.ErrorResponse
import org.apache.openwhisk.http.Messages
import org.apache.openwhisk.core.database.UserContext
-
import akka.http.scaladsl.model.headers.RawHeader
+import org.apache.commons.lang3.StringUtils
import org.apache.openwhisk.core.entity.Attachments.Inline
/**
@@ -1027,6 +1027,61 @@ class ActionsApiTests extends ControllerTestCommon with
WhiskActionsApi {
}
}
+ it should "concurrently get an action with attachment that is not cached" in
{
+ implicit val tid = transid()
+ val action = WhiskAction(namespace, aname(),
jsDefault(nonInlinedCode(entityStore)), Parameters("x", "b"))
+ val kind = NODEJS6
+
+ val content = WhiskActionPut(
+ Some(action.exec),
+ Some(action.parameters),
+ Some(
+ ActionLimitsOption(
+ Some(action.limits.timeout),
+ Some(action.limits.memory),
+ Some(action.limits.logs),
+ Some(action.limits.concurrency))))
+ val name = action.name
+ val cacheKey = s"${CacheKey(action)}".replace("(", "\\(").replace(")",
"\\)")
+ val expectedGetLog = Seq(
+ s"finding document: 'id: ${action.namespace}/${action.name}",
+ s"finding attachment '[\\w-/:]+' of document 'id:
${action.namespace}/${action.name}").mkString("(?s).*")
+
+ Put(s"$collectionPath/$name", content) ~>
Route.seal(routes(creds)(transid())) ~> check {
+ status should be(OK)
+ }
+
+ removeFromCache(action, WhiskAction)
+
+ stream.reset()
+
+ val expectedAction = WhiskAction(
+ action.namespace,
+ action.name,
+ action.exec,
+ action.parameters,
+ action.limits,
+ action.version,
+ action.publish,
+ action.annotations ++ Parameters(WhiskAction.execFieldName, kind))
+
+ (0 until 5).par.map { i =>
+ Get(s"$collectionPath/$name") ~> Route.seal(routes(creds)(transid())) ~>
check {
+ status should be(OK)
+ val response = responseAs[WhiskAction]
+ response should be(expectedAction)
+ }
+ }
+
+ //Loading action with attachment concurrently should load only attachment
once
+ val logs = stream.toString
+ withClue(s"db logs $logs") {
+ StringUtils.countMatches(logs, "finding document") shouldBe 1
+ StringUtils.countMatches(logs, "finding attachment") shouldBe 1
+ }
+ stream.reset()
+ }
+
it should "update an existing action with attachment that is not cached" in {
implicit val tid = transid()
val nodeAction = WhiskAction(namespace, aname(),
jsDefault(nonInlinedCode(entityStore)), Parameters("x", "b"))