This is an automated email from the ASF dual-hosted git repository.
tysonnorris pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 1c3de9f Update CosmosDB to 2.4.2 (#4321)
1c3de9f is described below
commit 1c3de9f6efdbcd52f6c2d9767fdb83419aa7c771
Author: Chetan Mehrotra <[email protected]>
AuthorDate: Tue Mar 5 07:23:24 2019 +0530
Update CosmosDB to 2.4.2 (#4321)
* Update to CosmosDB SDK 2.4.2
* Remove logic related to storing attachments in CosmosDB
* Reduce throughput for test to 400
---
common/scala/build.gradle | 2 +-
.../database/cosmosdb/CosmosDBArtifactStore.scala | 134 ++-------------------
tests/src/test/resources/application.conf.j2 | 7 +-
.../cosmosdb/CosmosDBArtifactStoreTests.scala | 27 ++---
.../ArtifactStoreAttachmentBehaviors.scala | 5 +-
5 files changed, 29 insertions(+), 146 deletions(-)
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index fc7d540..1b1421b 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -86,7 +86,7 @@ dependencies {
compile 'io.reactivex:rxscala_2.12:0.26.5'
compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
- compile ('com.microsoft.azure:azure-cosmosdb:2.4.0'){
+ compile ('com.microsoft.azure:azure-cosmosdb:2.4.2'){
exclude group: 'commons-logging'
}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
index 79bd179..d7bdee1 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStore.scala
@@ -17,18 +17,15 @@
package org.apache.openwhisk.core.database.cosmosdb
-import java.io.ByteArrayInputStream
-
import _root_.rx.RxReactiveStreams
import akka.actor.ActorSystem
import akka.http.scaladsl.model.{ContentType, StatusCodes, Uri}
import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.{Sink, Source, StreamConverters}
-import akka.util.{ByteString, ByteStringBuilder}
+import akka.stream.scaladsl.{Sink, Source}
+import akka.util.ByteString
import com.microsoft.azure.cosmosdb._
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient
import kamon.metric.MeasurementUnit
-import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue,
RootJsonFormat, _}
import org.apache.openwhisk.common.{LogMarkerToken, Logging, LoggingMarkers,
MetricEmitter, TransactionId}
import org.apache.openwhisk.core.database.StoreUtils.{checkDocHasRevision,
deserialize, reportFailure}
import org.apache.openwhisk.core.database._
@@ -37,6 +34,7 @@ import
org.apache.openwhisk.core.database.cosmosdb.CosmosDBConstants._
import org.apache.openwhisk.core.entity.Attachments.Attached
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.http.Messages
+import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue,
RootJsonFormat, _}
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
@@ -74,7 +72,7 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
private val getToken = createToken("get")
private val queryToken = createToken("query")
private val countToken = createToken("count")
- private val putAttachmentToken = createToken("putAttachment", read = false)
+
private val clusterIdValue = config.clusterId.map(JsString(_))
logging.info(
@@ -313,96 +311,15 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
contentType: ContentType,
docStream: Source[ByteString, _],
oldAttachment: Option[Attached])(implicit transid: TransactionId):
Future[(DocInfo, Attached)] = {
-
- val asJson = doc.toDocumentRecord
- val id = asJson.fields("_id").convertTo[String].trim
-
attachmentStore match {
case Some(as) =>
attachToExternalStore(doc, update, contentType, docStream,
oldAttachment, as)
case None =>
- attachToCosmos(id, doc, update, contentType, docStream, oldAttachment)
+ Future.failed(new IllegalArgumentException(
+ s" '$cosmosScheme' is now not supported. You must configure an
external AttachmentStore for storing attachments"))
}
}
- private def attachToCosmos[A <: DocumentAbstraction](
- id: String,
- doc: A,
- update: (A, Attached) => A,
- contentType: ContentType,
- docStream: Source[ByteString, _],
- oldAttachment: Option[Attached])(implicit transid: TransactionId):
Future[(DocInfo, Attached)] = {
- //Convert Source to ByteString as Cosmos API works with InputStream only
- for {
- allBytes <- toByteString(docStream)
- bytesOrSource <- inlineOrAttach(Source.single(allBytes))
- uri = uriOf(bytesOrSource, UUID().asString)
- attached <- {
- val a = bytesOrSource match {
- case Left(bytes) => Attached(uri.toString, contentType,
Some(bytes.size), Some(digest(bytes)))
- case Right(_) => Attached(uri.toString, contentType,
Some(allBytes.size), Some(digest(allBytes)))
- }
- Future.successful(a)
- }
- i1 <- put(update(doc, attached))
- i2 <- bytesOrSource match {
- case Left(_) => Future.successful(i1)
- case Right(s) => attach(i1, uri.path.toString,
attached.attachmentType, allBytes)
- }
- //Remove old attachment if it was part of attachmentStore
- _ <- oldAttachment
- .map { old =>
- val oldUri = Uri(old.attachmentName)
- if (oldUri.scheme == cosmosScheme) {
- val name = oldUri.path.toString
- val docId = DocId(id)
- client.deleteAttachment(s"${selfLinkOf(docId)}/attachments/$name",
newRequestOption(docId)).head()
- } else {
- Future.successful(true)
- }
- }
- .getOrElse(Future.successful(true))
- } yield (i2, attached)
- }
-
- private def attach(doc: DocInfo, name: String, contentType: ContentType,
allBytes: ByteString)(
- implicit transid: TransactionId): Future[DocInfo] = {
- val start = transid.started(
- this,
- LoggingMarkers.DATABASE_ATT_SAVE,
- s"[ATT_PUT] '$collName' uploading attachment '$name' of document '$doc'")
-
- checkDocHasRevision(doc)
- val options = new MediaOptions
- options.setContentType(contentType.toString())
- options.setSlug(name)
- val s = new ByteArrayInputStream(allBytes.toArray)
- val f = client
- .upsertAttachment(selfLinkOf(doc.id), s, options, matchRevOption(doc))
- .head()
- .transform(
- { r =>
- transid
- .finished(this, start, s"[ATT_PUT] '$collName' completed uploading
attachment '$name' of document '$doc'")
- collectMetrics(putAttachmentToken, r.getRequestCharge)
- doc //Adding attachment does not change the revision of document. So
retain the doc info
- }, {
- case e: DocumentClientException if isConflict(e) =>
- transid
- .finished(this, start, s"[ATT_PUT] '$collName' uploading
attachment '$name' of document '$doc'; conflict")
- DocumentConflictException("conflict on 'attachment put'")
- case e => e
- })
-
- reportFailure(
- f,
- start,
- failure => s"[ATT_PUT] '$collName' internal error, name: '$name', doc:
'$doc', failure: '${failure.getMessage}'")
- }
-
- private def toByteString(docStream: Source[ByteString, _]) =
- docStream.runFold(new ByteStringBuilder)((builder, b) => builder ++=
b).map(_.result().compact)
-
override protected[core] def readAttachment[T](doc: DocInfo, attached:
Attached, sink: Sink[ByteString, Future[T]])(
implicit transid: TransactionId): Future[T] = {
val name = attached.attachmentName
@@ -413,7 +330,8 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
case s if s == cosmosScheme || attachmentUri.isRelative =>
//relative case is for compatibility with earlier naming approach
where attachment name would be like 'jarfile'
//Compared to current approach of '<scheme>:<name>'
- readAttachmentFromCosmos(doc, attachmentUri, sink)
+ Future.failed(new IllegalArgumentException(
+ s" '$cosmosScheme' is now not supported. You must configure an
external AttachmentStore for storing attachments"))
case s if attachmentStore.isDefined && attachmentStore.get.scheme == s =>
attachmentStore.get.readAttachment(doc.id,
attachmentUri.path.toString, sink)
case _ =>
@@ -421,42 +339,6 @@ class CosmosDBArtifactStore[DocumentAbstraction <:
DocumentSerializer](protected
}
}
- private def readAttachmentFromCosmos[T](doc: DocInfo, attachmentUri: Uri,
sink: Sink[ByteString, Future[T]])(
- implicit transid: TransactionId): Future[T] = {
- val name = attachmentUri.path
- val start = transid.started(
- this,
- LoggingMarkers.DATABASE_ATT_GET,
- s"[ATT_GET] '$collName' finding attachment '$name' of document '$doc'")
- checkDocHasRevision(doc)
-
- val f = client
- .readAttachment(s"${selfLinkOf(doc.id)}/attachments/$name",
matchRevOption(doc))
- .head()
- .flatMap(a => client.readMedia(a.getResource.getMediaLink).head())
- .transform(
- { r =>
- //Here stream can only be fetched once
- StreamConverters
- .fromInputStream(() => r.getMedia)
- .runWith(sink)
- }, {
- case e: DocumentClientException if isNotFound(e) =>
- transid.finished(
- this,
- start,
- s"[ATT_GET] '$collName', retrieving attachment '$name' of
document '$doc'; not found.")
- NoDocumentException("not found on 'delete'")
- case e => e
- })
- .flatMap(identity)
-
- reportFailure(
- f,
- start,
- failure => s"[ATT_GET] '$collName' internal error, name: '$name', doc:
'$doc', failure: '${failure.getMessage}'")
- }
-
override protected[core] def deleteAttachments[T](doc: DocInfo)(implicit
transid: TransactionId): Future[Boolean] =
attachmentStore
.map(as => as.deleteAttachments(doc.id))
diff --git a/tests/src/test/resources/application.conf.j2
b/tests/src/test/resources/application.conf.j2
index a3f590d..20dc8cc 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -53,9 +53,10 @@ whisk {
}
cosmosdb {
- endpoint = ${?COSMOSDB_ENDPOINT}
- key = ${?COSMOSDB_KEY}
- db = ${?COSMOSDB_NAME}
+ endpoint = ${?COSMOSDB_ENDPOINT}
+ key = ${?COSMOSDB_KEY}
+ db = ${?COSMOSDB_NAME}
+ throughput = 400
}
controller {
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
index 9898317..af2e8d8 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/CosmosDBArtifactStoreTests.scala
@@ -21,6 +21,8 @@ import com.typesafe.config.ConfigFactory
import io.netty.util.ResourceLeakDetector
import io.netty.util.ResourceLeakDetector.Level
import org.apache.openwhisk.common.TransactionId
+import org.apache.openwhisk.core.database.DocumentSerializer
+import org.apache.openwhisk.core.database.memory.MemoryAttachmentStoreProvider
import org.apache.openwhisk.core.database.test.behavior.ArtifactStoreBehavior
import org.apache.openwhisk.core.entity.WhiskEntityQueries.TOP
import org.apache.openwhisk.core.entity.size._
@@ -34,19 +36,20 @@ import org.apache.openwhisk.core.entity.{
}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
-import org.scalatest.{FlatSpec, Pending}
+import org.scalatest.FlatSpec
import spray.json.JsString
+import org.apache.openwhisk.core.entity.size._
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+
+import scala.reflect.ClassTag
@RunWith(classOf[JUnitRunner])
class CosmosDBArtifactStoreTests extends FlatSpec with
CosmosDBStoreBehaviorBase with ArtifactStoreBehavior {
override protected def maxAttachmentSizeWithoutAttachmentStore = 1.MB
private var initialLevel: Level = _
- // See https://github.com/apache/incubator-openwhisk/issues/4286
- private val ignoredTests = Set(
- "CosmosDBArtifactStore attachments should fail on reading with old non
inlined attachment",
- "CosmosDBArtifactStore attachments should work on reading with old inlined
attachment",
- "CosmosDBArtifactStore attachments should put and read large attachment")
override protected def beforeAll(): Unit = {
RecordingLeakDetectorFactory.register()
@@ -55,6 +58,9 @@ class CosmosDBArtifactStoreTests extends FlatSpec with
CosmosDBStoreBehaviorBase
super.beforeAll()
}
+ override protected def getAttachmentStore[D <: DocumentSerializer:
ClassTag]() =
+ Some(MemoryAttachmentStoreProvider.makeStore[D]())
+
override def afterAll(): Unit = {
super.afterAll()
ResourceLeakDetector.setLevel(initialLevel)
@@ -67,15 +73,6 @@ class CosmosDBArtifactStoreTests extends FlatSpec with
CosmosDBStoreBehaviorBase
}
}
- override protected def withFixture(test: NoArgTest) = {
- val outcome = super.withFixture(test)
- val result = if (outcome.isFailed && ignoredTests.contains(test.name)) {
- println(s"Ignoring failed test ${test.name}")
- Pending
- } else outcome
- result
- }
-
behavior of "CosmosDB Setup"
it should "be configured with default throughput" in {
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
index ef571ba..6d4a247 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreAttachmentBehaviors.scala
@@ -170,12 +170,15 @@ trait ArtifactStoreAttachmentBehaviors extends
ArtifactStoreBehaviorBase with Ex
it should "throw NoDocumentException for non existing attachment" in {
implicit val tid: TransactionId = transid()
+ val attachmentName = "foo"
+ val attachmentId =
+ getAttachmentStore(entityStore).map(s =>
s"${s.scheme}:$attachmentName").getOrElse(attachmentName)
val sink = StreamConverters.fromOutputStream(() => new
ByteArrayOutputStream())
entityStore
.readAttachment[IOResult](
DocInfo ! ("non-existing-doc", "42"),
- Attached("foo", ContentTypes.`application/octet-stream`),
+ Attached(attachmentId, ContentTypes.`application/octet-stream`),
sink)
.failed
.futureValue shouldBe a[NoDocumentException]