This is an automated email from the ASF dual-hosted git repository.

tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d41b39  Enable attachment reads from CloudFront (#4392)
8d41b39 is described below

commit 8d41b39dba3ea5c6e3e795c547051ba00f6bd8a5
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Tue Apr 16 01:50:38 2019 +0530

    Enable attachment reads from CloudFront (#4392)
    
    * Enable attachment reads from CloudFront
---
 common/scala/build.gradle                          |  1 +
 common/scala/src/main/resources/s3-reference.conf  | 16 +++++
 .../core/database/s3/CloudFrontSigner.scala        | 54 +++++++++++++++
 .../core/database/s3/S3AttachmentStore.scala       | 77 +++++++++++++++++----
 .../core/database/s3/CloudFrontSignerTests.scala   | 78 ++++++++++++++++++++++
 .../s3/S3AttachmentStoreCloudFrontTests.scala      | 47 +++++++++++++
 .../apache/openwhisk/core/database/s3/S3Aws.scala  | 12 ++--
 .../database/test/AttachmentStoreBehaviors.scala   | 16 +++--
 8 files changed, 278 insertions(+), 23 deletions(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 089a8d2..84d3fb5 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -97,6 +97,7 @@ dependencies {
         exclude group: 'com.fasterxml.jackson.core'
         exclude group: 'com.fasterxml.jackson.dataformat'
     }
+    compile ('com.amazonaws:aws-java-sdk-cloudfront:1.11.517')
     scoverage gradle.scoverage.deps
 }
 
diff --git a/common/scala/src/main/resources/s3-reference.conf 
b/common/scala/src/main/resources/s3-reference.conf
index 1413096..664bdcf 100644
--- a/common/scala/src/main/resources/s3-reference.conf
+++ b/common/scala/src/main/resources/s3-reference.conf
@@ -10,6 +10,22 @@ whisk {
     # Folder path within the bucket (optional)
     # prefix =
 
+    # CloudFront configuration - Enable this if the attachments are to be read 
from the CloudFront CDN
+    # cloud-front-config {
+        # CloudFront domain name
+        # domain-name = xxx.cloudfront.net
+
+        # Signing key pair id
+        # 
https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/private-content-trusted-signers.html#private-content-creating-cloudfront-key-pairs
+        # key-pair-id = AAXXXX
+
+        # Private key content in PEM format
+        # private-key = """-----BEGIN RSA PRIVATE KEY-----xxx-----END RSA 
PRIVATE KEY-----"""
+
+        # Timeout for generated signed url
+        # timeout = 10 min
+    # }
+
     # See https://developer.lightbend.com/docs/alpakka/current/s3.html#usage
     alpakka {
       # whether the buffer request chunks (up to 5MB each) to "memory" or 
"disk"
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/CloudFrontSigner.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/CloudFrontSigner.scala
new file mode 100644
index 0000000..0c36bc4
--- /dev/null
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/CloudFrontSigner.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.s3
+import java.io.ByteArrayInputStream
+import java.nio.charset.StandardCharsets.UTF_8
+import java.security.PrivateKey
+import java.time.Instant
+import java.util.Date
+
+import akka.http.scaladsl.model.Uri
+import com.amazonaws.auth.PEM
+import com.amazonaws.services.cloudfront.CloudFrontUrlSigner
+import com.amazonaws.services.cloudfront.util.SignerUtils
+import com.amazonaws.services.cloudfront.util.SignerUtils.Protocol
+
+import scala.concurrent.duration._
+
+case class CloudFrontConfig(domainName: String,
+                            keyPairId: String,
+                            privateKey: String,
+                            timeout: FiniteDuration = 10.minutes)
+
+case class CloudFrontSigner(config: CloudFrontConfig) extends UrlSigner {
+  private val privateKey = createPrivateKey(config.privateKey)
+
+  override def getSignedURL(s3ObjectKey: String): Uri = {
+    val resourcePath = SignerUtils.generateResourcePath(Protocol.https, 
config.domainName, s3ObjectKey)
+    val date = Date.from(Instant.now().plusSeconds(config.timeout.toSeconds))
+    val url = CloudFrontUrlSigner.getSignedURLWithCannedPolicy(resourcePath, 
config.keyPairId, privateKey, date)
+    Uri(url)
+  }
+
+  override def toString: String = s"CloudFront Signer - ${config.domainName}"
+
+  private def createPrivateKey(keyContent: String): PrivateKey = {
+    val is = new ByteArrayInputStream(keyContent.getBytes(UTF_8))
+    PEM.readPrivateKey(is)
+  }
+}
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala
index d1f675f..8122274 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala
@@ -17,16 +17,22 @@
 
 package org.apache.openwhisk.core.database.s3
 
+import akka.NotUsed
 import akka.actor.ActorSystem
 import akka.event.Logging
-import akka.http.scaladsl.model.ContentType
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.model.headers.CacheDirectives._
+import akka.http.scaladsl.model.headers._
+import akka.http.scaladsl.model.{ContentType, HttpRequest, HttpResponse, 
ResponseEntity, Uri}
+import akka.http.scaladsl.unmarshalling.Unmarshal
 import akka.stream.ActorMaterializer
+import akka.stream.alpakka.s3.acl.CannedAcl
+import akka.stream.alpakka.s3.impl.S3Headers
 import akka.stream.alpakka.s3.scaladsl.S3Client
 import akka.stream.alpakka.s3.{S3Exception, S3Settings}
 import akka.stream.scaladsl.{Sink, Source}
 import akka.util.ByteString
 import com.typesafe.config.Config
-import pureconfig.loadConfigOrThrow
 import org.apache.openwhisk.common.LoggingMarkers.{
   DATABASE_ATTS_DELETE,
   DATABASE_ATT_DELETE,
@@ -38,17 +44,21 @@ import org.apache.openwhisk.core.ConfigKeys
 import org.apache.openwhisk.core.database.StoreUtils._
 import org.apache.openwhisk.core.database._
 import org.apache.openwhisk.core.entity.DocId
+import pureconfig.loadConfigOrThrow
 
+import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future}
 import scala.reflect.ClassTag
 
 object S3AttachmentStoreProvider extends AttachmentStoreProvider {
   val alpakkaConfigKey = s"${ConfigKeys.s3}.alpakka"
-  case class S3Config(bucket: String, prefix: Option[String]) {
+  case class S3Config(bucket: String, prefix: Option[String], 
cloudFrontConfig: Option[CloudFrontConfig] = None) {
     def prefixFor[D](implicit tag: ClassTag[D]): String = {
       val className = tag.runtimeClass.getSimpleName.toLowerCase
       prefix.map(p => s"$p/$className").getOrElse(className)
     }
+
+    def signer: Option[UrlSigner] = cloudFrontConfig.map(CloudFrontSigner)
   }
 
   override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit 
actorSystem: ActorSystem,
@@ -56,7 +66,7 @@ object S3AttachmentStoreProvider extends 
AttachmentStoreProvider {
                                                               materializer: 
ActorMaterializer): AttachmentStore = {
     val client = new S3Client(S3Settings(alpakkaConfigKey))
     val config = loadConfigOrThrow[S3Config](ConfigKeys.s3)
-    new S3AttachmentStore(client, config.bucket, config.prefixFor[D])
+    new S3AttachmentStore(client, config.bucket, config.prefixFor[D], 
config.signer)
   }
 
   def makeStore[D <: DocumentSerializer: ClassTag](config: Config)(implicit 
actorSystem: ActorSystem,
@@ -64,19 +74,30 @@ object S3AttachmentStoreProvider extends 
AttachmentStoreProvider {
                                                                    
materializer: ActorMaterializer): AttachmentStore = {
     val client = new S3Client(S3Settings(config, alpakkaConfigKey))
     val s3config = loadConfigOrThrow[S3Config](config, ConfigKeys.s3)
-    new S3AttachmentStore(client, s3config.bucket, s3config.prefixFor[D])
+    new S3AttachmentStore(client, s3config.bucket, s3config.prefixFor[D], 
s3config.signer)
   }
 
 }
-class S3AttachmentStore(client: S3Client, bucket: String, prefix: 
String)(implicit system: ActorSystem,
-                                                                          
logging: Logging,
-                                                                          
materializer: ActorMaterializer)
+
+trait UrlSigner {
+  def getSignedURL(s3ObjectKey: String): Uri
+}
+
+class S3AttachmentStore(client: S3Client, bucket: String, prefix: String, 
urlSigner: Option[UrlSigner])(
+  implicit system: ActorSystem,
+  logging: Logging,
+  materializer: ActorMaterializer)
     extends AttachmentStore {
+  private val commonS3Headers = S3Headers(
+    Seq(
+      CannedAcl.Private.header, //All objects are private
+      `Cache-Control`(`max-age`(365.days.toSeconds))) //As objects are 
immutable cache them for long time
+  )
   override val scheme = "s3"
 
   override protected[core] implicit val executionContext: ExecutionContext = 
system.dispatcher
 
-  logging.info(this, s"Initializing S3AttachmentStore with bucket=[$bucket], 
prefix=[$prefix]")
+  logging.info(this, s"Initializing S3AttachmentStore with bucket=[$bucket], 
prefix=[$prefix], signer=[$urlSigner]")
 
   override protected[core] def attach(
     docId: DocId,
@@ -90,7 +111,9 @@ class S3AttachmentStore(client: S3Client, bucket: String, 
prefix: String)(implic
     //A possible optimization for small attachments < 5MB can be to use 
putObject instead of multipartUpload
     //and thus use 1 remote call instead of 3
     val f = docStream
-      .runWith(combinedSink(client.multipartUpload(bucket, objectKey(docId, 
name), contentType)))
+      .runWith(
+        combinedSink(client
+          .multipartUploadWithHeaders(bucket, objectKey(docId, name), 
contentType, s3Headers = Some(commonS3Headers))))
       .map(r => AttachResult(r.digest, r.length))
 
     f.foreach(_ =>
@@ -111,7 +134,7 @@ class S3AttachmentStore(client: S3Client, bucket: String, 
prefix: String)(implic
         this,
         DATABASE_ATT_GET,
         s"[ATT_GET] '$prefix' finding attachment '$name' of document 'id: 
$docId'")
-    val (source, _) = client.download(bucket, objectKey(docId, name))
+    val source = getAttachmentSource(objectKey(docId, name))
 
     val f = source.runWith(sink)
 
@@ -139,6 +162,30 @@ class S3AttachmentStore(client: S3Client, bucket: String, 
prefix: String)(implic
         s"[ATT_GET] '$prefix' internal error, name: '$name', doc: 'id: 
$docId', failure: '${failure.getMessage}'")
   }
 
+  private def getAttachmentSource(objectKey: String): Source[ByteString, 
NotUsed] = urlSigner match {
+    case Some(signer) => getUrlContent(signer.getSignedURL(objectKey))
+    case None         => client.download(bucket, objectKey)._1
+  }
+
+  private def getUrlContent(uri: Uri): Source[ByteString, NotUsed] = {
+    val future = Http().singleRequest(HttpRequest(uri = uri))
+    Source
+      .fromFuture(future.flatMap(entityForSuccess))
+      .map(_.dataBytes)
+      .flatMapConcat(identity)
+  }
+
+  private def entityForSuccess(resp: HttpResponse): Future[ResponseEntity] =
+    resp match {
+      case HttpResponse(status, _, entity, _) if status.isSuccess() && 
!status.isRedirection() =>
+        Future.successful(entity)
+      case HttpResponse(_, _, entity, _) =>
+        Unmarshal(entity).to[String].map { err =>
+          //With CloudFront also the error message confirms to same S3 
exception format
+          throw new S3Exception(err)
+        }
+    }
+
   override protected[core] def deleteAttachments(docId: DocId)(implicit 
transid: TransactionId): Future[Boolean] = {
     val start =
       transid.started(
@@ -195,9 +242,11 @@ class S3AttachmentStore(client: S3Client, bucket: String, 
prefix: String)(implic
   private def isMissingKeyException(e: Throwable): Boolean = {
     //In some case S3Exception is a sub cause. So need to recurse
     e match {
-      case s: S3Exception if s.code == "NoSuchKey"             => true
-      case t if t != null && isMissingKeyException(t.getCause) => true
-      case _                                                   => false
+      case s: S3Exception if s.code == "NoSuchKey" => true
+      // In case of CloudFront a missing key would be reflected as access 
denied
+      case s: S3Exception if s.code == "AccessDenied" && urlSigner.isDefined 
=> true
+      case t if t != null && isMissingKeyException(t.getCause)               
=> true
+      case _                                                                 
=> false
     }
   }
 }
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/s3/CloudFrontSignerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/s3/CloudFrontSignerTests.scala
new file mode 100644
index 0000000..8aa9f03
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/s3/CloudFrontSignerTests.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.s3
+import akka.http.scaladsl.model.Uri.Path
+import com.typesafe.config.ConfigFactory
+import java.time.Instant
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.database.s3.S3AttachmentStoreProvider.S3Config
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers, OptionValues}
+import pureconfig.loadConfigOrThrow
+
+@RunWith(classOf[JUnitRunner])
+class CloudFrontSignerTests extends FlatSpec with Matchers with OptionValues {
+
+  val qt = "\"\"\""
+  val privateKey =
+    """-----BEGIN RSA PRIVATE KEY-----
+      |MIIBPAIBAAJBAOY+Q7vyH1SnCUoFIpzqmZe1TNCxiE6zuiMRmjuJqiAzQWdb5hEA
+      |ZaC+f7Lcu53IvczZR0KsP4JndzG23rVg/y0CAwEAAQJBAMK+F3x4ppdrUSgSf9xJ
+      |cfAnoPlDsA8hZWcUFGgXYJYqKYw3NqoYG5fwyZ7xrwdMhpbdgD++nsBC/JMwUhEB
+      |h+ECIQDzj5Tbd7WvfaKGjozwQgHA9u3f53kxCWovpFEngU6VNwIhAPIAkAPnzuDr
+      |q3cEyAbM49ozjyc6/NOV6QK65HQj1gC7AiBrax/Ty3At/dL4VVaDgBkV6dHvtj8V
+      |CXnzmRzRt43Y8QIhAIzrvPE5RGP/eEqHUz96glhm276Zf+5qBlTbpfrnf0/PAiEA
+      |r1vFsvC8+KSHv7XGU1xfeiHHpHxEfDvJlX7/CxeWumQ=
+      |-----END RSA PRIVATE KEY-----
+      |""".stripMargin
+
+  val keyPairId = "OPENWHISKISFUNTOUSE"
+  val configString =
+    s"""whisk {
+       |  s3 {
+       |    bucket = "openwhisk-test"
+       |    prefix = "dev"
+       |    cloud-front-config {
+       |      domain-name = "foo.com"
+       |      key-pair-id = "$keyPairId"
+       |      private-key = $qt$privateKey$qt
+       |      timeout = 10 m
+       |    }
+       |  }
+       |}""".stripMargin
+
+  behavior of "CloudFront config"
+
+  it should "generate a signed url" in {
+    val config = 
ConfigFactory.parseString(configString).withFallback(ConfigFactory.load())
+    val s3Config = loadConfigOrThrow[S3Config](config, ConfigKeys.s3)
+    val signer = CloudFrontSigner(s3Config.cloudFrontConfig.get)
+    val expiration = 
Instant.now().plusSeconds(s3Config.cloudFrontConfig.get.timeout.toSeconds)
+    val uri = signer.getSignedURL("bar")
+    val query = uri.query()
+
+    //A signed url is of format
+    //https://<domain-name>/<object 
key>?Expires=xxx&Signature=xxx&Key-Pair-Id=xxx
+    uri.scheme shouldBe "https"
+    uri.path.tail shouldBe Path("bar")
+    query.get("Expires") shouldBe Some(expiration.getEpochSecond.toString)
+    query.get("Signature") shouldBe defined
+    query.get("Key-Pair-Id").value shouldBe keyPairId
+  }
+}
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStoreCloudFrontTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStoreCloudFrontTests.scala
new file mode 100644
index 0000000..f2c8b7e
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStoreCloudFrontTests.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.s3
+import org.apache.openwhisk.core.entity.WhiskEntity
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class S3AttachmentStoreCloudFrontTests extends S3AttachmentStoreBehaviorBase 
with S3Aws {
+  override lazy val store = makeS3Store[WhiskEntity]
+
+  override def storeType: String = "S3_CloudFront"
+  override def cloudFrontConfig: String =
+    """
+      |cloud-front-config {
+      |  domain-name = ${CLOUDFRONT_DOMAIN_NAME}
+      |  key-pair-id = ${CLOUDFRONT_KEY_PAIR_ID}
+      |  private-key = ${CLOUDFRONT_PRIVATE_KEY}
+      |}
+    """.stripMargin
+
+  override protected def withFixture(test: NoArgTest) = {
+    assume(
+      System.getenv("CLOUDFRONT_PRIVATE_KEY") != null,
+      "Configure following env variables for test " +
+        "to run 'CLOUDFRONT_DOMAIN_NAME', 'CLOUDFRONT_KEY_PAIR_ID', 
'CLOUDFRONT_PRIVATE_KEY'")
+    super.withFixture(test)
+  }
+
+  //With CloudFront deletes are not immediate and instead the objects may live 
in CDN cache untill TTL
+  override protected val lazyDeletes = true
+}
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3Aws.scala 
b/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3Aws.scala
index 8e52966..2a8f558 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3Aws.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/s3/S3Aws.scala
@@ -27,6 +27,9 @@ import org.apache.openwhisk.core.database.{AttachmentStore, 
DocumentSerializer}
 import scala.reflect.ClassTag
 
 trait S3Aws extends FlatSpec {
+
+  def cloudFrontConfig: String = ""
+
   def makeS3Store[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: 
ActorSystem,
                                                        logging: Logging,
                                                        materializer: 
ActorMaterializer): AttachmentStore = {
@@ -47,17 +50,18 @@ trait S3Aws extends FlatSpec {
        |         }
        |      }
        |      bucket = "$bucket"
+       |      $cloudFrontConfig
        |    }
        |}
-      """.stripMargin).withFallback(ConfigFactory.load())
+      """.stripMargin).withFallback(ConfigFactory.load()).resolve()
     S3AttachmentStoreProvider.makeStore[D](config)
   }
 
   override protected def withFixture(test: NoArgTest) = {
     assume(
       secretAccessKey != null,
-      s"'AWS_SECRET_ACCESS_KEY' env not configured. Configure following " +
-        s"env variables for test to run. 'AWS_ACCESS_KEY_ID', 
'AWS_SECRET_ACCESS_KEY', 'AWS_REGION'")
+      "'AWS_SECRET_ACCESS_KEY' env not configured. Configure following " +
+        "env variables for test to run. 'AWS_ACCESS_KEY_ID', 
'AWS_SECRET_ACCESS_KEY', 'AWS_REGION'")
 
     require(accessKeyId != null, "'AWS_ACCESS_KEY_ID' env variable not set")
     require(region != null, "'AWS_REGION' env variable not set")
@@ -65,7 +69,7 @@ trait S3Aws extends FlatSpec {
     super.withFixture(test)
   }
 
-  val bucket = "test-ow-travis"
+  val bucket = Option(System.getenv("AWS_BUCKET")).getOrElse("test-ow-travis")
 
   val accessKeyId = System.getenv("AWS_ACCESS_KEY_ID")
   val secretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY")
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala
index 9caabe6..56a9d29 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/test/AttachmentStoreBehaviors.scala
@@ -59,6 +59,12 @@ trait AttachmentStoreBehaviors
 
   def garbageCollectAttachments: Boolean = true
 
+  /**
+   * In some cases like when CloudFront CDN is used then deletes are not 
immediately reflected in reads
+   * as the objects are still present in cache. For such cases we would relax 
some of the test assertions
+   */
+  protected def lazyDeletes: Boolean = false
+
   behavior of s"$storeType AttachmentStore"
 
   it should "add and read attachment" in {
@@ -108,19 +114,19 @@ trait AttachmentStoreBehaviors
     store.deleteAttachment(docId, "c1").futureValue shouldBe true
 
     //Non deleted attachments related to same docId must still be accessible
-    attachmentBytes(docId, "c1").failed.futureValue shouldBe 
a[NoDocumentException]
+    if (!lazyDeletes) attachmentBytes(docId, "c1").failed.futureValue shouldBe 
a[NoDocumentException]
     attachmentBytes(docId, "c2").futureValue.result() shouldBe ByteString(b2)
     attachmentBytes(docId, "c3").futureValue.result() shouldBe ByteString(b3)
 
     //Delete all attachments
     store.deleteAttachments(docId).futureValue shouldBe true
 
-    attachmentBytes(docId, "c2").failed.futureValue shouldBe 
a[NoDocumentException]
-    attachmentBytes(docId, "c3").failed.futureValue shouldBe 
a[NoDocumentException]
+    if (!lazyDeletes) attachmentBytes(docId, "c2").failed.futureValue shouldBe 
a[NoDocumentException]
+    if (!lazyDeletes) attachmentBytes(docId, "c3").failed.futureValue shouldBe 
a[NoDocumentException]
 
     //Make sure doc2 attachments are left untouched
-    attachmentBytes(docId2, "c21").futureValue.result() shouldBe ByteString(b1)
-    attachmentBytes(docId2, "c22").futureValue.result() shouldBe ByteString(b2)
+    if (!lazyDeletes) attachmentBytes(docId2, "c21").futureValue.result() 
shouldBe ByteString(b1)
+    if (!lazyDeletes) attachmentBytes(docId2, "c22").futureValue.result() 
shouldBe ByteString(b2)
   }
 
   it should "throw NoDocumentException on reading non existing attachment" in {

Reply via email to