This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new e7253ac Enable soft delete mode for documents in CosmosDB (#4339)
e7253ac is described below
commit e7253ac5bac519df289321a22c5ee923bb96c1f7
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Fri Mar 22 10:25:27 2019 +0530
Enable soft delete mode for documents in CosmosDB (#4339)
* Enable soft deletion of documents in CosmosDB
---
common/scala/src/main/resources/application.conf | 5 +
.../database/cosmosdb/CosmosDBArtifactStore.scala | 163 ++++++++++++++-------
.../core/database/cosmosdb/CosmosDBConfig.scala | 3 +-
.../core/database/cosmosdb/CosmosDBUtil.scala | 41 ++++++
.../database/cosmosdb/CosmosDBViewMapper.scala | 17 ++-
.../cosmosdb/CosmosDBSoftDeleteTests.scala | 43 ++++++
.../cosmosdb/CosmosDBStoreBehaviorBase.scala | 4 +-
.../test/behavior/ArtifactStoreCRUDBehaviors.scala | 38 ++++-
.../behavior/ArtifactStoreQueryBehaviors.scala | 18 +++
9 files changed, 272 insertions(+), 60 deletions(-)
diff --git a/common/scala/src/main/resources/application.conf
b/common/scala/src/main/resources/application.conf
index 435dc94..c176478 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -201,6 +201,11 @@ whisk {
# Specifies the current clusterId whose value is recorded with
document upon any update
# to indicate which cluster made the change. By default no such value
is recorded
# cluster-id =
+
+ # Enables soft delete mode where by the document would not be actually
deleted. Instead
+ # it would be marked deleted by setting `_deleted` property to true
and then actual delete
+ # happens via TTL.
+ # soft-delete-ttl = 10 h
connection-policy {
max-pool-size = 1000
# When the value of this property is true, the SDK will direct
write operations to
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index d7bdee1..c833ae6 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -24,6 +24,7 @@ import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import com.microsoft.azure.cosmosdb._
+import com.microsoft.azure.cosmosdb.internal.Constants.Properties
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
import kamon.metric.MeasurementUnit
import org.apache.openwhisk.common.{LogMarkerToken, Logging, LoggingMarkers,
MetricEmitter, TransactionId}
@@ -34,7 +35,7 @@ import
org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants._
import org.apache.openwhisk.core.entity.Attachments.Attached
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.http.Messages
-import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue,
RootJsonFormat, _}
+import spray.json._
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
@@ -64,14 +65,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
protected val client: AsyncDocumentClient = clientRef.get.client
private[cosmosdb] val (database, collection) = initialize()
- private val _id = "_id"
- private val _rev = "_rev"
-
private val putToken = createToken("put", read = false)
private val delToken = createToken("del", read = false)
private val getToken = createToken("get")
private val queryToken = createToken("query")
private val countToken = createToken("count")
+ private val softDeleteTTL = config.softDeleteTTL.map(_.toSeconds.toInt)
private val clusterIdValue = config.clusterId.map(JsString(_))
@@ -79,7 +78,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
this,
s"Initializing CosmosDBArtifactStore for collection [$collName]. Service
endpoint [${client.getServiceEndpoint}], " +
s"Read endpoint [${client.getReadEndpoint}], Write endpoint
[${client.getWriteEndpoint}], Connection Policy
[${client.getConnectionPolicy}], " +
- s"Time to live [${collection.getDefaultTimeToLive} secs, clusterId
[${config.clusterId}]")
+ s"Time to live [${collection.getDefaultTimeToLive} secs, clusterId
[${config.clusterId}], soft delete TTL [${config.softDeleteTTL}]")
//Clone the returned instance as these are mutable
def documentCollection(): DocumentCollection = new
DocumentCollection(collection.toJson)
@@ -94,7 +93,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
val docinfoStr = s"id: $id, rev: ${doc.getETag}"
val start = transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT]
'$collName' saving document: '$docinfoStr'")
- val o = if (doc.getETag == null) {
+ val o = if (isNewDocument(doc)) {
client.createDocument(collection.getSelfLink, doc, newRequestOption(id),
true)
} else {
client.replaceDocument(doc, matchRevOption(id, doc.getETag))
@@ -102,6 +101,28 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
val f = o
.head()
+ .recoverWith {
+ case e: DocumentClientException if isConflict(e) && isNewDocument(doc)
=>
+ val docId = DocId(asJson.fields(_id).convertTo[String])
+ //Fetch existing document and check if its deleted
+ getRaw(docId).flatMap {
+ case Some(js) =>
+ if (isSoftDeleted(js)) {
+ //Existing document is soft deleted. So can be replaced. Use
the etag of document
+ //and replace it with document we are trying to add
+ val etag = js.fields(Properties.E_TAG).convertTo[String]
+ client.replaceDocument(doc, matchRevOption(id, etag)).head()
+ } else {
+ //Trying to create a new document and found an existing
+ //Document which is valid (not soft delete) then conflict is a
valid outcome
+ throw e
+ }
+ case None =>
+ //Document not found. Should not happen unless someone else
removed
+ //Propagate existing exception
+ throw e
+ }
+ }
.transform(
{ r =>
transid.finished(this, start, s"[PUT] '$collName' completed
document: '$docinfoStr'")
@@ -120,13 +141,14 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
override protected[database] def del(doc: DocInfo)(implicit transid:
TransactionId): Future[Boolean] = {
checkDocHasRevision(doc)
val start = transid.started(this, LoggingMarkers.DATABASE_DELETE, s"[DEL]
'$collName' deleting document: '$doc'")
- val f = client
- .deleteDocument(selfLinkOf(doc.id), matchRevOption(doc))
- .head()
+ val f = softDeleteTTL match {
+ case Some(_) => softDelete(doc)
+ case None => hardDelete(doc)
+ }
+ val g = f
.transform(
- { r =>
+ { _ =>
transid.finished(this, start, s"[DEL] '$collName' completed
document: '$doc'")
- collectMetrics(delToken, r.getRequestCharge)
true
}, {
case e: DocumentClientException if isNotFound(e) =>
@@ -139,11 +161,35 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
})
reportFailure(
- f,
+ g,
start,
failure => s"[DEL] '$collName' internal error, doc: '$doc', failure:
'${failure.getMessage}'")
}
+ private def hardDelete(doc: DocInfo) = {
+ val f = client
+ .deleteDocument(selfLinkOf(doc.id), matchRevOption(doc))
+ .head()
+ f.foreach(r => collectMetrics(delToken, r.getRequestCharge))
+ f
+ }
+
+ private def softDelete(doc: DocInfo)(implicit transid: TransactionId) = {
+ for {
+ js <- getAsWhiskJson(doc.id)
+ r <- softDeletePut(doc, js)
+ } yield r
+ }
+
+ private def softDeletePut(docInfo: DocInfo, js: JsObject)(implicit transid:
TransactionId) = {
+ val deletedJs = transform(js, Seq((deleted, Some(JsTrue))))
+ val doc = toCosmosDoc(deletedJs)
+ softDeleteTTL.foreach(doc.setTimeToLive(_))
+ val f = client.replaceDocument(doc, matchRevOption(docInfo)).head()
+ f.foreach(r => collectMetrics(putToken, r.getRequestCharge))
+ f
+ }
+
override protected[database] def get[A <: DocumentAbstraction](doc: DocInfo,
attachmentHandler: Option[(A, Attached) => A] = None)(
implicit transid: TransactionId,
@@ -151,25 +197,32 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
val start = transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET]
'$collName' finding document: '$doc'")
require(doc != null, "doc undefined")
- val f = client
- .readDocument(selfLinkOf(doc.id), newRequestOption(doc.id))
- .head()
- .transform(
- { rr =>
- val js = getResultToWhiskJsonDoc(rr.getResource)
- transid.finished(this, start, s"[GET] '$collName' completed: found
document '$doc'")
- collectMetrics(getToken, rr.getRequestCharge)
- deserialize[A, DocumentAbstraction](doc, js)
- }, {
- case e: DocumentClientException if isNotFound(e) =>
- transid.finished(this, start, s"[GET] '$collName', document:
'$doc'; not found.")
- // for compatibility
- throw NoDocumentException("not found on 'get'")
- case e => e
- })
- .recoverWith {
- case _: DeserializationException => throw
DocumentUnreadable(Messages.corruptedEntity)
- }
+ val f =
+ client
+ .readDocument(selfLinkOf(doc.id), newRequestOption(doc.id))
+ .head()
+ .transform(
+ { rr =>
+ collectMetrics(getToken, rr.getRequestCharge)
+ if (isSoftDeleted(rr.getResource)) {
+ transid.finished(this, start, s"[GET] '$collName', document:
'$doc'; not found.")
+ // for compatibility
+ throw NoDocumentException("not found on 'get'")
+ } else {
+ val js = getResultToWhiskJsonDoc(rr.getResource)
+ transid.finished(this, start, s"[GET] '$collName' completed:
found document '$doc'")
+ deserialize[A, DocumentAbstraction](doc, js)
+ }
+ }, {
+ case e: DocumentClientException if isNotFound(e) =>
+ transid.finished(this, start, s"[GET] '$collName', document:
'$doc'; not found.")
+ // for compatibility
+ throw NoDocumentException("not found on 'get'")
+ case e => e
+ })
+ .recoverWith {
+ case _: DeserializationException => throw
DocumentUnreadable(Messages.corruptedEntity)
+ }
reportFailure(
f,
@@ -185,13 +238,20 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
.readDocument(selfLinkOf(id), newRequestOption(id))
.head()
.map { rr =>
- val js = getResultToWhiskJsonDoc(rr.getResource)
- transid.finished(this, start, s"[GET_BY_ID] '$collName' completed:
found document '$id'")
collectMetrics(getToken, rr.getRequestCharge)
- Some(js)
+ if (isSoftDeleted(rr.getResource)) {
+ transid.finished(this, start, s"[GET_BY_ID] '$collName' completed:
'$id' not found")
+ None
+ } else {
+ val js = getResultToWhiskJsonDoc(rr.getResource)
+ transid.finished(this, start, s"[GET_BY_ID] '$collName' completed:
found document '$id'")
+ Some(js)
+ }
}
.recoverWith {
- case e: DocumentClientException if isNotFound(e) =>
Future.successful(None)
+ case e: DocumentClientException if isNotFound(e) =>
+ transid.finished(this, start, s"[GET_BY_ID] '$collName' completed:
'$id' not found")
+ Future.successful(None)
}
reportFailure(
@@ -216,6 +276,17 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
}
}
+ private def getAsWhiskJson(id: DocId): Future[JsObject] = {
+ client
+ .readDocument(selfLinkOf(id), newRequestOption(id))
+ .head()
+ .map { rr =>
+ val js = getResultToWhiskJsonDoc(rr.getResource)
+ collectMetrics(getToken, rr.getRequestCharge)
+ js
+ }
+ }
+
override protected[core] def query(table: String,
startKey: List[Any],
endKey: List[Any],
@@ -369,6 +440,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
val mapped = transform(json, fieldsToAdd, fieldsToRemove)
val doc = new Document(mapped.compactPrint)
doc.set(selfLink, createSelfLink(doc.getId))
+ doc.setTimeToLive(null) //Disable any TTL if in effect for earlier revision
doc
}
@@ -386,21 +458,6 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
toWhiskJsonDoc(js, doc.getId, Some(JsString(doc.getETag)))
}
- private def toWhiskJsonDoc(js: JsObject, id: String, etag:
Option[JsString]): JsObject = {
- val fieldsToAdd = Seq((_id, Some(JsString(unescapeId(id)))), (_rev, etag))
- transform(stripInternalFields(js), fieldsToAdd, Seq.empty)
- }
-
- private def transform(json: JsObject, fieldsToAdd: Seq[(String,
Option[JsValue])], fieldsToRemove: Seq[String]) = {
- val fields = json.fields ++ fieldsToAdd.flatMap(f => f._2.map((f._1, _)))
-- fieldsToRemove
- JsObject(fields)
- }
-
- private def stripInternalFields(js: JsObject) = {
- //Strip out all field name starting with '_' which are considered as db
specific internal fields
- JsObject(js.fields.filter { case (k, _) => !k.startsWith("_") && k != cid
})
- }
-
private def toDocInfo[T <: Resource](doc: T) = {
checkDoc(doc)
DocInfo(DocId(unescapeId(doc.getId)), DocRevision(doc.getETag))
@@ -450,4 +507,10 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "ru",
"used", tags = tags)(MeasurementUnit.none)
else LogMarkerToken("cosmosdb", "ru", collName,
Some(action))(MeasurementUnit.none)
}
+
+ private def isSoftDeleted(doc: Document) = doc.getBoolean(deleted) == true
+
+ private def isSoftDeleted(js: JsObject) =
js.fields.get(deleted).contains(JsTrue)
+
+ private def isNewDocument(doc: Document) = doc.getETag == null
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
index a7e40ae..de0de9d 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
@@ -38,7 +38,8 @@ case class CosmosDBConfig(endpoint: String,
consistencyLevel: ConsistencyLevel,
connectionPolicy: ConnectionPolicy,
timeToLive: Option[Duration],
- clusterId: Option[String]) {
+ clusterId: Option[String],
+ softDeleteTTL: Option[FiniteDuration]) {
def createClient(): AsyncDocumentClient = {
new AsyncDocumentClient.Builder()
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
index 1fa74d2..2953cd8 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.database.cosmosdb
import com.microsoft.azure.cosmosdb.internal.Constants.Properties.{AGGREGATE,
E_TAG, ID, SELF_LINK}
import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants._
+import spray.json.{JsObject, JsString, JsValue}
import scala.collection.immutable.Iterable
@@ -44,11 +45,26 @@ private[cosmosdb] object CosmosDBConstants {
* lifetime of a document as different clusters may change the same document
at different times
*/
val clusterId: String = "_clusterId"
+
+ /**
+ * Property indicating that document has been marked as deleted with ttl
+ */
+ val deleted: String = "_deleted"
}
private[cosmosdb] trait CosmosDBUtil {
/**
+ * Name of `id` field as used in WhiskDocument
+ */
+ val _id: String = "_id"
+
+ /**
+ * Name of revision field as used in WhiskDocument
+ */
+ val _rev: String = "_rev"
+
+ /**
* Prepares the json like select clause
* {{{
* Seq("a", "b", "c.d.e") =>
@@ -103,6 +119,31 @@ private[cosmosdb] trait CosmosDBUtil {
id.replace("|", "/")
}
+ def toWhiskJsonDoc(js: JsObject, id: String, etag: Option[JsString]):
JsObject = {
+ val fieldsToAdd = Seq((_id, Some(JsString(unescapeId(id)))), (_rev, etag))
+ transform(stripInternalFields(js), fieldsToAdd, Seq.empty)
+ }
+
+ /**
+ * Transforms a json object by adding and removing fields
+ *
+ * @param json base json object to transform
+ * @param fieldsToAdd list of fields to add. If the value provided is `None`
then it would be ignored
+ * @param fieldsToRemove list of field names to remove
+ * @return transformed json
+ */
+ def transform(json: JsObject,
+ fieldsToAdd: Seq[(String, Option[JsValue])],
+ fieldsToRemove: Seq[String] = Seq.empty): JsObject = {
+ val fields = json.fields ++ fieldsToAdd.flatMap(f => f._2.map((f._1, _)))
-- fieldsToRemove
+ JsObject(fields)
+ }
+
+ private def stripInternalFields(js: JsObject) = {
+ //Strip out all field name starting with '_' which are considered as db
specific internal fields
+ JsObject(js.fields.filter { case (k, _) => !k.startsWith("_") && k != cid
})
+ }
+
}
private[cosmosdb] object CosmosDBUtil extends CosmosDBUtil
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
index 47a2bb8..b77d874 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
@@ -25,7 +25,7 @@ import com.microsoft.azure.cosmosdb.IndexingMode.Lazy
import com.microsoft.azure.cosmosdb.{PartitionKeyDefinition, SqlParameter,
SqlParameterCollection, SqlQuerySpec}
import org.apache.openwhisk.core.database.ActivationHandler.NS_PATH
import org.apache.openwhisk.core.database.WhisksHandler.ROOT_NS
-import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants.{alias,
computed}
+import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants.{alias,
computed, deleted}
import org.apache.openwhisk.core.database.{
ActivationHandler,
DocumentHandler,
@@ -39,6 +39,7 @@ import org.apache.openwhisk.core.entity.WhiskEntityQueries.TOP
private[cosmosdb] trait CosmosDBViewMapper {
protected val NOTHING = ""
protected val ALL_FIELDS = "*"
+ protected val notDeleted = s"(NOT(IS_DEFINED(r.$deleted)) OR r.$deleted =
false)"
protected def handler: DocumentHandler
def prepareQuery(ddoc: String,
@@ -89,7 +90,7 @@ private[cosmosdb] abstract class SimpleMapper extends
CosmosDBViewMapper {
val orderField = orderByField(ddoc, viewName)
val order = if (descending) "DESC" else NOTHING
- val query = s"SELECT $selectClause FROM root r WHERE ${whereClause._1}
ORDER BY $orderField $order"
+ val query = s"SELECT $selectClause FROM root r WHERE $notDeleted AND
${whereClause._1} ORDER BY $orderField $order"
prepareSpec(query, whereClause._2)
}
@@ -244,6 +245,7 @@ private[cosmosdb] object SubjectViewMapper extends
CosmosDBViewMapper {
private val BLOCKED = "blocked"
private val SUBJECT = "subject"
private val NAME = "name"
+ private val notBlocked = s"(NOT(IS_DEFINED(r.$BLOCKED)) OR r.$BLOCKED =
false)"
val handler = SubjectHandler
@@ -293,13 +295,14 @@ private[cosmosdb] object SubjectViewMapper extends
CosmosDBViewMapper {
startKey: List[Any],
endKey: List[Any],
count: Boolean): SqlQuerySpec
= {
- val notBlocked = s"(NOT(IS_DEFINED(r.$BLOCKED)) OR r.$BLOCKED = false)"
val (where, params) = startKey match {
case (ns: String) :: Nil =>
- (s"$notBlocked AND ((r.$SUBJECT = @name AND IS_DEFINED(r.$KEY)) OR
n.$NAME = @name)", ("@name", ns) :: Nil)
+ (
+ s"$notDeleted AND $notBlocked AND ((r.$SUBJECT = @name AND
IS_DEFINED(r.$KEY)) OR n.$NAME = @name)",
+ ("@name", ns) :: Nil)
case (uuid: String) :: (key: String) :: Nil =>
(
- s"$notBlocked AND ((r.$UUID = @uuid AND r.$KEY = @key) OR (n.$UUID =
@uuid AND n.$KEY = @key))",
+ s"$notDeleted AND $notBlocked AND ((r.$UUID = @uuid AND r.$KEY =
@key) OR (n.$UUID = @uuid AND n.$KEY = @key))",
("@uuid", uuid) :: ("@key", key) :: Nil)
case _ => throw UnsupportedQueryKeys(s"$ddoc/$view -> ($startKey,
$endKey)")
}
@@ -310,9 +313,9 @@ private[cosmosdb] object SubjectViewMapper extends
CosmosDBViewMapper {
prepareSpec(
s"""SELECT ${selectClause(count)} AS $alias
FROM root r
- WHERE r.$BLOCKED = true
+ WHERE (r.$BLOCKED = true
OR r.$CONCURRENT_INVOCATIONS = 0
- OR r.$INVOCATIONS_PER_MIN = 0 """,
+ OR r.$INVOCATIONS_PER_MIN = 0) AND $notDeleted """,
Nil)
private def selectClause(count: Boolean) = if (count) "TOP 1 VALUE COUNT(r)"
else "r"
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSoftDeleteTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSoftDeleteTests.scala
new file mode 100644
index 0000000..6cf262c
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSoftDeleteTests.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb
+import org.apache.openwhisk.core.database.DocumentSerializer
+import org.apache.openwhisk.core.database.memory.MemoryAttachmentStoreProvider
+import
org.apache.openwhisk.core.database.test.behavior.{ArtifactStoreCRUDBehaviors,
ArtifactStoreQueryBehaviors}
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+
+import scala.reflect.ClassTag
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class CosmosDBSoftDeleteTests
+ extends FlatSpec
+ with CosmosDBStoreBehaviorBase
+ with ArtifactStoreCRUDBehaviors
+ with ArtifactStoreQueryBehaviors {
+ override def storeType = "CosmosDB_SoftDelete"
+
+ override protected def getAttachmentStore[D <: DocumentSerializer:
ClassTag]() =
+ Some(MemoryAttachmentStoreProvider.makeStore[D]())
+
+ override def adaptCosmosDBConfig(config: CosmosDBConfig): CosmosDBConfig =
+ config.copy(softDeleteTTL = Some(15.minutes))
+
+}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
index 18fbf29..b78bdbc 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
@@ -37,7 +37,7 @@ trait CosmosDBStoreBehaviorBase extends FlatSpec with
ArtifactStoreBehaviorBase
override lazy val storeAvailableCheck: Try[Any] = storeConfigTry
- protected lazy val config: CosmosDBConfig = storeConfig.copy(db =
createTestDB().getId)
+ protected lazy val config: CosmosDBConfig =
adaptCosmosDBConfig(storeConfig.copy(db = createTestDB().getId))
override lazy val authStore = {
implicit val docReader: DocumentReader = WhiskDocumentReader
@@ -62,4 +62,6 @@ trait CosmosDBStoreBehaviorBase extends FlatSpec with
ArtifactStoreBehaviorBase
store.asInstanceOf[CosmosDBArtifactStore[_]].attachmentStore
protected def getAttachmentStore[D <: DocumentSerializer: ClassTag]():
Option[AttachmentStore] = None
+
+ protected def adaptCosmosDBConfig(config: CosmosDBConfig): CosmosDBConfig =
config
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreCRUDBehaviors.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreCRUDBehaviors.scala
index 9e8fc1f..61bca67 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreCRUDBehaviors.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreCRUDBehaviors.scala
@@ -20,7 +20,7 @@ package org.apache.openwhisk.core.database.test.behavior
import java.time.Instant
import org.apache.openwhisk.common.TransactionId
-import org.apache.openwhisk.core.database.{DocumentConflictException,
NoDocumentException}
+import org.apache.openwhisk.core.database.{DocumentConflictException,
DocumentProvider, NoDocumentException}
import org.apache.openwhisk.core.entity._
trait ArtifactStoreCRUDBehaviors extends ArtifactStoreBehaviorBase {
@@ -73,6 +73,22 @@ trait ArtifactStoreCRUDBehaviors extends
ArtifactStoreBehaviorBase {
}
}
+ it should "work if same document was deleted earlier" in {
+ implicit val tid: TransactionId = transid()
+ val auth = newAuth()
+ //1. Create a document
+ val doc = put(authStore, auth)
+
+ //2. Now delete the document
+ delete(authStore, doc) shouldBe true
+
+ //3. Now recreate the same document.
+ val doc2 = put(authStore, auth)
+
+ //Recreating a deleted document should work
+ doc2.rev.empty shouldBe false
+ }
+
behavior of s"${storeType}ArtifactStore delete"
it should "deletes existing document" in {
@@ -159,4 +175,24 @@ trait ArtifactStoreCRUDBehaviors extends
ArtifactStoreBehaviorBase {
implicit val tid: TransactionId = transid()
authStore.get[WhiskAuth](DocInfo("non-existing-doc")).failed.futureValue
shouldBe a[NoDocumentException]
}
+
+ it should "not get a deleted document" in {
+ implicit val tid: TransactionId = transid()
+ val auth = newAuth()
+ //1. Create a document
+ val docInfo = put(authStore, auth)
+
+ //2. Now delete the document
+ delete(authStore, docInfo) shouldBe true
+
+ //3. Now getting a deleted document should fail
+ authStore.get[WhiskAuth](docInfo).failed.futureValue shouldBe
a[NoDocumentException]
+
+ //Check get by id flow also which return none for such "soft" deleted
document
+ authStore match {
+ case provider: DocumentProvider =>
+ provider.get(docInfo.id).futureValue shouldBe None
+ case _ =>
+ }
+ }
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreQueryBehaviors.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreQueryBehaviors.scala
index d7c9f03..772403e 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreQueryBehaviors.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreQueryBehaviors.scala
@@ -91,6 +91,24 @@ trait ArtifactStoreQueryBehaviors extends
ArtifactStoreBehaviorBase {
result.map(_.fields("value")) should contain theSameElementsAs
entities.map(_.summaryAsJson)
}
+ it should "exclude deleted entities" in {
+ implicit val tid: TransactionId = transid()
+
+ val ns = newNS()
+ val entities = Seq(newAction(ns), newAction(ns), newAction(ns))
+ val validEntities = entities.tail
+ val infos = entities.map(put(entityStore, _))
+
+ delete(entityStore, infos.head)
+
+ waitOnView(entityStore, ns.root, 2, WhiskAction.view)
+ val result =
+ query[WhiskEntity](entityStore, WhiskAction.view.name, List(ns.asString,
0), List(ns.asString, TOP, TOP))
+
+ result should have length validEntities.length
+ result.map(_.fields("value")) should contain theSameElementsAs
validEntities.map(_.summaryAsJson)
+ }
+
it should "return result in sorted order" in {
implicit val tid: TransactionId = transid()