This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new e7253ac  Enable soft delete mode for documents in CosmosDB (#4339)
e7253ac is described below

commit e7253ac5bac519df289321a22c5ee923bb96c1f7
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Fri Mar 22 10:25:27 2019 +0530

    Enable soft delete mode for documents in CosmosDB (#4339)
    
    * Enable soft deletion of documents in CosmosDB
---
 common/scala/src/main/resources/application.conf   |   5 +
 .../database/cosmosdb/CosmosDBArtifactStore.scala  | 163 ++++++++++++++-------
 .../core/database/cosmosdb/CosmosDBConfig.scala    |   3 +-
 .../core/database/cosmosdb/CosmosDBUtil.scala      |  41 ++++++
 .../database/cosmosdb/CosmosDBViewMapper.scala     |  17 ++-
 .../cosmosdb/CosmosDBSoftDeleteTests.scala         |  43 ++++++
 .../cosmosdb/CosmosDBStoreBehaviorBase.scala       |   4 +-
 .../test/behavior/ArtifactStoreCRUDBehaviors.scala |  38 ++++-
 .../behavior/ArtifactStoreQueryBehaviors.scala     |  18 +++
 9 files changed, 272 insertions(+), 60 deletions(-)

diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index 435dc94..c176478 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -201,6 +201,11 @@ whisk {
         # Specifies the current clusterId whose value is recorded with 
document upon any update
         # to indicate which cluster made the change. By default no such value 
is recorded
         # cluster-id        =
+
+        # Enables soft delete mode where by the document would not be actually 
deleted. Instead
+        # it would be marked deleted by setting `_deleted` property to true 
and then actual delete
+        # happens via TTL.
+        # soft-delete-ttl   = 10 h
         connection-policy {
             max-pool-size = 1000
             # When the value of this property is true, the SDK will direct 
write operations to
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index d7bdee1..c833ae6 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -24,6 +24,7 @@ import akka.stream.ActorMaterializer
 import akka.stream.scaladsl.{Sink, Source}
 import akka.util.ByteString
 import com.microsoft.azure.cosmosdb._
+import com.microsoft.azure.cosmosdb.internal.Constants.Properties
 import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
 import kamon.metric.MeasurementUnit
 import org.apache.openwhisk.common.{LogMarkerToken, Logging, LoggingMarkers, 
MetricEmitter, TransactionId}
@@ -34,7 +35,7 @@ import 
org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants._
 import org.apache.openwhisk.core.entity.Attachments.Attached
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.http.Messages
-import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue, 
RootJsonFormat, _}
+import spray.json._
 
 import scala.collection.JavaConverters._
 import scala.concurrent.{ExecutionContext, Future}
@@ -64,14 +65,12 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
   protected val client: AsyncDocumentClient = clientRef.get.client
   private[cosmosdb] val (database, collection) = initialize()
 
-  private val _id = "_id"
-  private val _rev = "_rev"
-
   private val putToken = createToken("put", read = false)
   private val delToken = createToken("del", read = false)
   private val getToken = createToken("get")
   private val queryToken = createToken("query")
   private val countToken = createToken("count")
+  private val softDeleteTTL = config.softDeleteTTL.map(_.toSeconds.toInt)
 
   private val clusterIdValue = config.clusterId.map(JsString(_))
 
@@ -79,7 +78,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
     this,
     s"Initializing CosmosDBArtifactStore for collection [$collName]. Service 
endpoint [${client.getServiceEndpoint}], " +
       s"Read endpoint [${client.getReadEndpoint}], Write endpoint 
[${client.getWriteEndpoint}], Connection Policy 
[${client.getConnectionPolicy}], " +
-      s"Time to live [${collection.getDefaultTimeToLive} secs, clusterId 
[${config.clusterId}]")
+      s"Time to live [${collection.getDefaultTimeToLive} secs, clusterId 
[${config.clusterId}], soft delete TTL [${config.softDeleteTTL}]")
 
   //Clone the returned instance as these are mutable
   def documentCollection(): DocumentCollection = new 
DocumentCollection(collection.toJson)
@@ -94,7 +93,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
     val docinfoStr = s"id: $id, rev: ${doc.getETag}"
     val start = transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] 
'$collName' saving document: '$docinfoStr'")
 
-    val o = if (doc.getETag == null) {
+    val o = if (isNewDocument(doc)) {
       client.createDocument(collection.getSelfLink, doc, newRequestOption(id), 
true)
     } else {
       client.replaceDocument(doc, matchRevOption(id, doc.getETag))
@@ -102,6 +101,28 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
 
     val f = o
       .head()
+      .recoverWith {
+        case e: DocumentClientException if isConflict(e) && isNewDocument(doc) 
=>
+          val docId = DocId(asJson.fields(_id).convertTo[String])
+          //Fetch existing document and check if its deleted
+          getRaw(docId).flatMap {
+            case Some(js) =>
+              if (isSoftDeleted(js)) {
+                //Existing document is soft deleted. So can be replaced. Use 
the etag of document
+                //and replace it with document we are trying to add
+                val etag = js.fields(Properties.E_TAG).convertTo[String]
+                client.replaceDocument(doc, matchRevOption(id, etag)).head()
+              } else {
+                //Trying to create a new document and found an existing
+                //Document which is valid (not soft delete) then conflict is a 
valid outcome
+                throw e
+              }
+            case None =>
+              //Document not found. Should not happen unless someone else 
removed
+              //Propagate existing exception
+              throw e
+          }
+      }
       .transform(
         { r =>
           transid.finished(this, start, s"[PUT] '$collName' completed 
document: '$docinfoStr'")
@@ -120,13 +141,14 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
   override protected[database] def del(doc: DocInfo)(implicit transid: 
TransactionId): Future[Boolean] = {
     checkDocHasRevision(doc)
     val start = transid.started(this, LoggingMarkers.DATABASE_DELETE, s"[DEL] 
'$collName' deleting document: '$doc'")
-    val f = client
-      .deleteDocument(selfLinkOf(doc.id), matchRevOption(doc))
-      .head()
+    val f = softDeleteTTL match {
+      case Some(_) => softDelete(doc)
+      case None    => hardDelete(doc)
+    }
+    val g = f
       .transform(
-        { r =>
+        { _ =>
           transid.finished(this, start, s"[DEL] '$collName' completed 
document: '$doc'")
-          collectMetrics(delToken, r.getRequestCharge)
           true
         }, {
           case e: DocumentClientException if isNotFound(e) =>
@@ -139,11 +161,35 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
         })
 
     reportFailure(
-      f,
+      g,
       start,
       failure => s"[DEL] '$collName' internal error, doc: '$doc', failure: 
'${failure.getMessage}'")
   }
 
+  private def hardDelete(doc: DocInfo) = {
+    val f = client
+      .deleteDocument(selfLinkOf(doc.id), matchRevOption(doc))
+      .head()
+    f.foreach(r => collectMetrics(delToken, r.getRequestCharge))
+    f
+  }
+
+  private def softDelete(doc: DocInfo)(implicit transid: TransactionId) = {
+    for {
+      js <- getAsWhiskJson(doc.id)
+      r <- softDeletePut(doc, js)
+    } yield r
+  }
+
+  private def softDeletePut(docInfo: DocInfo, js: JsObject)(implicit transid: 
TransactionId) = {
+    val deletedJs = transform(js, Seq((deleted, Some(JsTrue))))
+    val doc = toCosmosDoc(deletedJs)
+    softDeleteTTL.foreach(doc.setTimeToLive(_))
+    val f = client.replaceDocument(doc, matchRevOption(docInfo)).head()
+    f.foreach(r => collectMetrics(putToken, r.getRequestCharge))
+    f
+  }
+
   override protected[database] def get[A <: DocumentAbstraction](doc: DocInfo,
                                                                  
attachmentHandler: Option[(A, Attached) => A] = None)(
     implicit transid: TransactionId,
@@ -151,25 +197,32 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
     val start = transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET] 
'$collName' finding document: '$doc'")
 
     require(doc != null, "doc undefined")
-    val f = client
-      .readDocument(selfLinkOf(doc.id), newRequestOption(doc.id))
-      .head()
-      .transform(
-        { rr =>
-          val js = getResultToWhiskJsonDoc(rr.getResource)
-          transid.finished(this, start, s"[GET] '$collName' completed: found 
document '$doc'")
-          collectMetrics(getToken, rr.getRequestCharge)
-          deserialize[A, DocumentAbstraction](doc, js)
-        }, {
-          case e: DocumentClientException if isNotFound(e) =>
-            transid.finished(this, start, s"[GET] '$collName', document: 
'$doc'; not found.")
-            // for compatibility
-            throw NoDocumentException("not found on 'get'")
-          case e => e
-        })
-      .recoverWith {
-        case _: DeserializationException => throw 
DocumentUnreadable(Messages.corruptedEntity)
-      }
+    val f =
+      client
+        .readDocument(selfLinkOf(doc.id), newRequestOption(doc.id))
+        .head()
+        .transform(
+          { rr =>
+            collectMetrics(getToken, rr.getRequestCharge)
+            if (isSoftDeleted(rr.getResource)) {
+              transid.finished(this, start, s"[GET] '$collName', document: 
'$doc'; not found.")
+              // for compatibility
+              throw NoDocumentException("not found on 'get'")
+            } else {
+              val js = getResultToWhiskJsonDoc(rr.getResource)
+              transid.finished(this, start, s"[GET] '$collName' completed: 
found document '$doc'")
+              deserialize[A, DocumentAbstraction](doc, js)
+            }
+          }, {
+            case e: DocumentClientException if isNotFound(e) =>
+              transid.finished(this, start, s"[GET] '$collName', document: 
'$doc'; not found.")
+              // for compatibility
+              throw NoDocumentException("not found on 'get'")
+            case e => e
+          })
+        .recoverWith {
+          case _: DeserializationException => throw 
DocumentUnreadable(Messages.corruptedEntity)
+        }
 
     reportFailure(
       f,
@@ -185,13 +238,20 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
       .readDocument(selfLinkOf(id), newRequestOption(id))
       .head()
       .map { rr =>
-        val js = getResultToWhiskJsonDoc(rr.getResource)
-        transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: 
found document '$id'")
         collectMetrics(getToken, rr.getRequestCharge)
-        Some(js)
+        if (isSoftDeleted(rr.getResource)) {
+          transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: 
'$id' not found")
+          None
+        } else {
+          val js = getResultToWhiskJsonDoc(rr.getResource)
+          transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: 
found document '$id'")
+          Some(js)
+        }
       }
       .recoverWith {
-        case e: DocumentClientException if isNotFound(e) => 
Future.successful(None)
+        case e: DocumentClientException if isNotFound(e) =>
+          transid.finished(this, start, s"[GET_BY_ID] '$collName' completed: 
'$id' not found")
+          Future.successful(None)
       }
 
     reportFailure(
@@ -216,6 +276,17 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
       }
   }
 
+  private def getAsWhiskJson(id: DocId): Future[JsObject] = {
+    client
+      .readDocument(selfLinkOf(id), newRequestOption(id))
+      .head()
+      .map { rr =>
+        val js = getResultToWhiskJsonDoc(rr.getResource)
+        collectMetrics(getToken, rr.getRequestCharge)
+        js
+      }
+  }
+
   override protected[core] def query(table: String,
                                      startKey: List[Any],
                                      endKey: List[Any],
@@ -369,6 +440,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
     val mapped = transform(json, fieldsToAdd, fieldsToRemove)
     val doc = new Document(mapped.compactPrint)
     doc.set(selfLink, createSelfLink(doc.getId))
+    doc.setTimeToLive(null) //Disable any TTL if in effect for earlier revision
     doc
   }
 
@@ -386,21 +458,6 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
     toWhiskJsonDoc(js, doc.getId, Some(JsString(doc.getETag)))
   }
 
-  private def toWhiskJsonDoc(js: JsObject, id: String, etag: 
Option[JsString]): JsObject = {
-    val fieldsToAdd = Seq((_id, Some(JsString(unescapeId(id)))), (_rev, etag))
-    transform(stripInternalFields(js), fieldsToAdd, Seq.empty)
-  }
-
-  private def transform(json: JsObject, fieldsToAdd: Seq[(String, 
Option[JsValue])], fieldsToRemove: Seq[String]) = {
-    val fields = json.fields ++ fieldsToAdd.flatMap(f => f._2.map((f._1, _))) 
-- fieldsToRemove
-    JsObject(fields)
-  }
-
-  private def stripInternalFields(js: JsObject) = {
-    //Strip out all field name starting with '_' which are considered as db 
specific internal fields
-    JsObject(js.fields.filter { case (k, _) => !k.startsWith("_") && k != cid 
})
-  }
-
   private def toDocInfo[T <: Resource](doc: T) = {
     checkDoc(doc)
     DocInfo(DocId(unescapeId(doc.getId)), DocRevision(doc.getETag))
@@ -450,4 +507,10 @@ class CosmosDBArtifactStore[DocumentAbstraction <: 
DocumentSerializer](protected
     if (TransactionId.metricsKamonTags) LogMarkerToken("cosmosdb", "ru", 
"used", tags = tags)(MeasurementUnit.none)
     else LogMarkerToken("cosmosdb", "ru", collName, 
Some(action))(MeasurementUnit.none)
   }
+
+  private def isSoftDeleted(doc: Document) = doc.getBoolean(deleted) == true
+
+  private def isSoftDeleted(js: JsObject) = 
js.fields.get(deleted).contains(JsTrue)
+
+  private def isNewDocument(doc: Document) = doc.getETag == null
 }
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
index a7e40ae..de0de9d 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBConfig.scala
@@ -38,7 +38,8 @@ case class CosmosDBConfig(endpoint: String,
                           consistencyLevel: ConsistencyLevel,
                           connectionPolicy: ConnectionPolicy,
                           timeToLive: Option[Duration],
-                          clusterId: Option[String]) {
+                          clusterId: Option[String],
+                          softDeleteTTL: Option[FiniteDuration]) {
 
   def createClient(): AsyncDocumentClient = {
     new AsyncDocumentClient.Builder()
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
index 1fa74d2..2953cd8 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBUtil.scala
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.database.cosmosdb
 
 import com.microsoft.azure.cosmosdb.internal.Constants.Properties.{AGGREGATE, 
E_TAG, ID, SELF_LINK}
 import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants._
+import spray.json.{JsObject, JsString, JsValue}
 
 import scala.collection.immutable.Iterable
 
@@ -44,11 +45,26 @@ private[cosmosdb] object CosmosDBConstants {
    * lifetime of a document as different clusters may change the same document 
at different times
    */
   val clusterId: String = "_clusterId"
+
+  /**
+   * Property indicating that document has been marked as deleted with ttl
+   */
+  val deleted: String = "_deleted"
 }
 
 private[cosmosdb] trait CosmosDBUtil {
 
   /**
+   * Name of `id` field as used in WhiskDocument
+   */
+  val _id: String = "_id"
+
+  /**
+   * Name of revision field as used in WhiskDocument
+   */
+  val _rev: String = "_rev"
+
+  /**
    * Prepares the json like select clause
    * {{{
    *   Seq("a", "b", "c.d.e") =>
@@ -103,6 +119,31 @@ private[cosmosdb] trait CosmosDBUtil {
     id.replace("|", "/")
   }
 
+  def toWhiskJsonDoc(js: JsObject, id: String, etag: Option[JsString]): 
JsObject = {
+    val fieldsToAdd = Seq((_id, Some(JsString(unescapeId(id)))), (_rev, etag))
+    transform(stripInternalFields(js), fieldsToAdd, Seq.empty)
+  }
+
+  /**
+   * Transforms a json object by adding and removing fields
+   *
+   * @param json base json object to transform
+   * @param fieldsToAdd list of fields to add. If the value provided is `None` 
then it would be ignored
+   * @param fieldsToRemove list of field names to remove
+   * @return transformed json
+   */
+  def transform(json: JsObject,
+                fieldsToAdd: Seq[(String, Option[JsValue])],
+                fieldsToRemove: Seq[String] = Seq.empty): JsObject = {
+    val fields = json.fields ++ fieldsToAdd.flatMap(f => f._2.map((f._1, _))) 
-- fieldsToRemove
+    JsObject(fields)
+  }
+
+  private def stripInternalFields(js: JsObject) = {
+    //Strip out all field name starting with '_' which are considered as db 
specific internal fields
+    JsObject(js.fields.filter { case (k, _) => !k.startsWith("_") && k != cid 
})
+  }
+
 }
 
 private[cosmosdb] object CosmosDBUtil extends CosmosDBUtil
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
index 47a2bb8..b77d874 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBViewMapper.scala
@@ -25,7 +25,7 @@ import com.microsoft.azure.cosmosdb.IndexingMode.Lazy
 import com.microsoft.azure.cosmosdb.{PartitionKeyDefinition, SqlParameter, 
SqlParameterCollection, SqlQuerySpec}
 import org.apache.openwhisk.core.database.ActivationHandler.NS_PATH
 import org.apache.openwhisk.core.database.WhisksHandler.ROOT_NS
-import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants.{alias, 
computed}
+import org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants.{alias, 
computed, deleted}
 import org.apache.openwhisk.core.database.{
   ActivationHandler,
   DocumentHandler,
@@ -39,6 +39,7 @@ import org.apache.openwhisk.core.entity.WhiskEntityQueries.TOP
 private[cosmosdb] trait CosmosDBViewMapper {
   protected val NOTHING = ""
   protected val ALL_FIELDS = "*"
+  protected val notDeleted = s"(NOT(IS_DEFINED(r.$deleted)) OR r.$deleted = 
false)"
   protected def handler: DocumentHandler
 
   def prepareQuery(ddoc: String,
@@ -89,7 +90,7 @@ private[cosmosdb] abstract class SimpleMapper extends 
CosmosDBViewMapper {
     val orderField = orderByField(ddoc, viewName)
     val order = if (descending) "DESC" else NOTHING
 
-    val query = s"SELECT $selectClause FROM root r WHERE ${whereClause._1} 
ORDER BY $orderField $order"
+    val query = s"SELECT $selectClause FROM root r WHERE $notDeleted AND 
${whereClause._1} ORDER BY $orderField $order"
 
     prepareSpec(query, whereClause._2)
   }
@@ -244,6 +245,7 @@ private[cosmosdb] object SubjectViewMapper extends 
CosmosDBViewMapper {
   private val BLOCKED = "blocked"
   private val SUBJECT = "subject"
   private val NAME = "name"
+  private val notBlocked = s"(NOT(IS_DEFINED(r.$BLOCKED)) OR r.$BLOCKED = 
false)"
 
   val handler = SubjectHandler
 
@@ -293,13 +295,14 @@ private[cosmosdb] object SubjectViewMapper extends 
CosmosDBViewMapper {
                                                  startKey: List[Any],
                                                  endKey: List[Any],
                                                  count: Boolean): SqlQuerySpec 
= {
-    val notBlocked = s"(NOT(IS_DEFINED(r.$BLOCKED)) OR r.$BLOCKED = false)"
     val (where, params) = startKey match {
       case (ns: String) :: Nil =>
-        (s"$notBlocked AND ((r.$SUBJECT = @name AND IS_DEFINED(r.$KEY)) OR 
n.$NAME = @name)", ("@name", ns) :: Nil)
+        (
+          s"$notDeleted AND $notBlocked AND ((r.$SUBJECT = @name AND 
IS_DEFINED(r.$KEY)) OR n.$NAME = @name)",
+          ("@name", ns) :: Nil)
       case (uuid: String) :: (key: String) :: Nil =>
         (
-          s"$notBlocked AND ((r.$UUID = @uuid AND r.$KEY = @key) OR (n.$UUID = 
@uuid AND n.$KEY = @key))",
+          s"$notDeleted AND $notBlocked AND ((r.$UUID = @uuid AND r.$KEY = 
@key) OR (n.$UUID = @uuid AND n.$KEY = @key))",
           ("@uuid", uuid) :: ("@key", key) :: Nil)
       case _ => throw UnsupportedQueryKeys(s"$ddoc/$view -> ($startKey, 
$endKey)")
     }
@@ -310,9 +313,9 @@ private[cosmosdb] object SubjectViewMapper extends 
CosmosDBViewMapper {
     prepareSpec(
       s"""SELECT ${selectClause(count)} AS $alias
                   FROM   root r
-                  WHERE  r.$BLOCKED = true
+                  WHERE  (r.$BLOCKED = true
                           OR r.$CONCURRENT_INVOCATIONS = 0
-                          OR r.$INVOCATIONS_PER_MIN = 0 """,
+                          OR r.$INVOCATIONS_PER_MIN = 0) AND $notDeleted """,
       Nil)
 
   private def selectClause(count: Boolean) = if (count) "TOP 1 VALUE COUNT(r)" 
else "r"
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSoftDeleteTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSoftDeleteTests.scala
new file mode 100644
index 0000000..6cf262c
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBSoftDeleteTests.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.cosmosdb
+import org.apache.openwhisk.core.database.DocumentSerializer
+import org.apache.openwhisk.core.database.memory.MemoryAttachmentStoreProvider
+import 
org.apache.openwhisk.core.database.test.behavior.{ArtifactStoreCRUDBehaviors, 
ArtifactStoreQueryBehaviors}
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+
+import scala.reflect.ClassTag
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class CosmosDBSoftDeleteTests
+    extends FlatSpec
+    with CosmosDBStoreBehaviorBase
+    with ArtifactStoreCRUDBehaviors
+    with ArtifactStoreQueryBehaviors {
+  override def storeType = "CosmosDB_SoftDelete"
+
+  override protected def getAttachmentStore[D <: DocumentSerializer: 
ClassTag]() =
+    Some(MemoryAttachmentStoreProvider.makeStore[D]())
+
+  override def adaptCosmosDBConfig(config: CosmosDBConfig): CosmosDBConfig =
+    config.copy(softDeleteTTL = Some(15.minutes))
+
+}
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
index 18fbf29..b78bdbc 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBStoreBehaviorBase.scala
@@ -37,7 +37,7 @@ trait CosmosDBStoreBehaviorBase extends FlatSpec with 
ArtifactStoreBehaviorBase
 
   override lazy val storeAvailableCheck: Try[Any] = storeConfigTry
 
-  protected lazy val config: CosmosDBConfig = storeConfig.copy(db = 
createTestDB().getId)
+  protected lazy val config: CosmosDBConfig = 
adaptCosmosDBConfig(storeConfig.copy(db = createTestDB().getId))
 
   override lazy val authStore = {
     implicit val docReader: DocumentReader = WhiskDocumentReader
@@ -62,4 +62,6 @@ trait CosmosDBStoreBehaviorBase extends FlatSpec with 
ArtifactStoreBehaviorBase
     store.asInstanceOf[CosmosDBArtifactStore[_]].attachmentStore
 
   protected def getAttachmentStore[D <: DocumentSerializer: ClassTag](): 
Option[AttachmentStore] = None
+
+  protected def adaptCosmosDBConfig(config: CosmosDBConfig): CosmosDBConfig = 
config
 }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreCRUDBehaviors.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreCRUDBehaviors.scala
index 9e8fc1f..61bca67 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreCRUDBehaviors.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreCRUDBehaviors.scala
@@ -20,7 +20,7 @@ package org.apache.openwhisk.core.database.test.behavior
 import java.time.Instant
 
 import org.apache.openwhisk.common.TransactionId
-import org.apache.openwhisk.core.database.{DocumentConflictException, 
NoDocumentException}
+import org.apache.openwhisk.core.database.{DocumentConflictException, 
DocumentProvider, NoDocumentException}
 import org.apache.openwhisk.core.entity._
 
 trait ArtifactStoreCRUDBehaviors extends ArtifactStoreBehaviorBase {
@@ -73,6 +73,22 @@ trait ArtifactStoreCRUDBehaviors extends 
ArtifactStoreBehaviorBase {
     }
   }
 
+  it should "work if same document was deleted earlier" in {
+    implicit val tid: TransactionId = transid()
+    val auth = newAuth()
+    //1. Create a document
+    val doc = put(authStore, auth)
+
+    //2. Now delete the document
+    delete(authStore, doc) shouldBe true
+
+    //3. Now recreate the same document.
+    val doc2 = put(authStore, auth)
+
+    //Recreating a deleted document should work
+    doc2.rev.empty shouldBe false
+  }
+
   behavior of s"${storeType}ArtifactStore delete"
 
   it should "deletes existing document" in {
@@ -159,4 +175,24 @@ trait ArtifactStoreCRUDBehaviors extends 
ArtifactStoreBehaviorBase {
     implicit val tid: TransactionId = transid()
     authStore.get[WhiskAuth](DocInfo("non-existing-doc")).failed.futureValue 
shouldBe a[NoDocumentException]
   }
+
+  it should "not get a deleted document" in {
+    implicit val tid: TransactionId = transid()
+    val auth = newAuth()
+    //1. Create a document
+    val docInfo = put(authStore, auth)
+
+    //2. Now delete the document
+    delete(authStore, docInfo) shouldBe true
+
+    //3. Now getting a deleted document should fail
+    authStore.get[WhiskAuth](docInfo).failed.futureValue shouldBe 
a[NoDocumentException]
+
+    //Check get by id flow also which return none for such "soft" deleted 
document
+    authStore match {
+      case provider: DocumentProvider =>
+        provider.get(docInfo.id).futureValue shouldBe None
+      case _ =>
+    }
+  }
 }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreQueryBehaviors.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreQueryBehaviors.scala
index d7c9f03..772403e 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreQueryBehaviors.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreQueryBehaviors.scala
@@ -91,6 +91,24 @@ trait ArtifactStoreQueryBehaviors extends 
ArtifactStoreBehaviorBase {
     result.map(_.fields("value")) should contain theSameElementsAs 
entities.map(_.summaryAsJson)
   }
 
+  it should "exclude deleted entities" in {
+    implicit val tid: TransactionId = transid()
+
+    val ns = newNS()
+    val entities = Seq(newAction(ns), newAction(ns), newAction(ns))
+    val validEntities = entities.tail
+    val infos = entities.map(put(entityStore, _))
+
+    delete(entityStore, infos.head)
+
+    waitOnView(entityStore, ns.root, 2, WhiskAction.view)
+    val result =
+      query[WhiskEntity](entityStore, WhiskAction.view.name, List(ns.asString, 
0), List(ns.asString, TOP, TOP))
+
+    result should have length validEntities.length
+    result.map(_.fields("value")) should contain theSameElementsAs 
validEntities.map(_.summaryAsJson)
+  }
+
   it should "return result in sorted order" in {
     implicit val tid: TransactionId = transid()
 

Reply via email to