dubeejw closed pull request #260: Update MessageHubMultiWorkersTest to use
ExtendedCouchDbRestClient
URL: https://github.com/apache/incubator-openwhisk-package-kafka/pull/260
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
index f6ce723..7ea14ec 100644
--- a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
+++ b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
@@ -20,24 +20,26 @@ import system.utils.KafkaUtils
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
-
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
-
import common.JsHelpers
import common.TestHelpers
import common.StreamLogging
+import common.WhiskProperties
import common.Wsk
import common.WskActorSystem
import common.WskProps
import common.WskTestHelpers
import spray.json.DefaultJsonProtocol._
import spray.json.{pimpAny, _}
-import whisk.core.database.test.DatabaseScriptTestUtils
-import whisk.utils.JsHelpers
+import whisk.core.WhiskConfig
+import whisk.core.database.test.ExtendedCouchDbRestClient
+import whisk.utils.{JsHelpers, retry}
+
+import scala.concurrent.Await
@RunWith(classOf[JUnitRunner])
class MessageHubMultiWorkersTest extends FlatSpec
@@ -47,8 +49,7 @@ class MessageHubMultiWorkersTest extends FlatSpec
with TestHelpers
with WskTestHelpers
with JsHelpers
- with StreamLogging
- with DatabaseScriptTestUtils {
+ with StreamLogging {
val topic = "test"
@@ -57,8 +58,17 @@ class MessageHubMultiWorkersTest extends FlatSpec
val messagingPackage = "/whisk.system/messaging"
val messageHubFeed = "messageHubFeed"
+
+ val dbProtocol = WhiskProperties.getProperty("db.protocol")
+ val dbHost = WhiskProperties.getProperty("db.host")
+ val dbPort = WhiskProperties.getProperty("db.port").toInt
+ val dbUsername = WhiskProperties.getProperty("db.username")
+ val dbPassword = WhiskProperties.getProperty("db.password")
+ val dbPrefix = WhiskProperties.getProperty(WhiskConfig.dbPrefix)
val dbName = s"${dbPrefix}ow_kafka_triggers"
+ val client = new ExtendedCouchDbRestClient(dbProtocol, dbHost, dbPort,
dbUsername, dbPassword, dbName)
+
val kafkaUtils = new KafkaUtils
behavior of "Mussage Hub Feed"
@@ -76,10 +86,14 @@ class MessageHubMultiWorkersTest extends FlatSpec
createTrigger(assetHelper, firstTrigger, parameters)
createTrigger(assetHelper, secondTrigger, parameters)
- val documents =
getAllDocs(dbName).fields("rows").convertTo[List[JsObject]]
+ retry({
+ val result = Await.result(client.getAllDocs(includeDocs = Some(true)),
15.seconds)
+ result should be('right)
+ val documents =
result.right.get.fields("rows").convertTo[List[JsObject]]
- validateTriggerAssignment(documents, firstTrigger, worker0)
- validateTriggerAssignment(documents, secondTrigger, worker0)
+ validateTriggerAssignment(documents, firstTrigger, worker0)
+ validateTriggerAssignment(documents, secondTrigger, worker0)
+ })
}
it should "assign a trigger to worker0 and a trigger to worker1 when both
workers are available" in withAssetCleaner(wskprops) {
@@ -96,10 +110,14 @@ class MessageHubMultiWorkersTest extends FlatSpec
createTrigger(assetHelper, firstTrigger, parameters)
createTrigger(assetHelper, secondTrigger, parameters)
- val documents =
getAllDocs(dbName).fields("rows").convertTo[List[JsObject]]
+ retry({
+ val result = Await.result(client.getAllDocs(includeDocs = Some(true)),
15.seconds)
+ result should be('right)
+ val documents =
result.right.get.fields("rows").convertTo[List[JsObject]]
- validateTriggerAssignment(documents, firstTrigger, worker0)
- validateTriggerAssignment(documents, secondTrigger, worker1)
+ validateTriggerAssignment(documents, firstTrigger, worker0)
+ validateTriggerAssignment(documents, secondTrigger, worker1)
+ })
}
it should "assign a trigger to worker1 when worker0 is removed and there is
an assignment imbalance" in withAssetCleaner(wskprops) {
@@ -120,12 +138,16 @@ class MessageHubMultiWorkersTest extends FlatSpec
createTrigger(assetHelper, thirdTrigger, parameters =
constructParams(List(worker0, worker1)))
createTrigger(assetHelper, fourthTrigger, parameters =
constructParams(List(worker1)))
- val documents =
getAllDocs(dbName).fields("rows").convertTo[List[JsObject]]
+ retry({
+ val result = Await.result(client.getAllDocs(includeDocs = Some(true)),
15.seconds)
+ result should be('right)
+ val documents =
result.right.get.fields("rows").convertTo[List[JsObject]]
- validateTriggerAssignment(documents, firstTrigger, worker1)
- validateTriggerAssignment(documents, secondTrigger, worker1)
- validateTriggerAssignment(documents, thirdTrigger, worker0)
- validateTriggerAssignment(documents, fourthTrigger, worker1)
+ validateTriggerAssignment(documents, firstTrigger, worker1)
+ validateTriggerAssignment(documents, secondTrigger, worker1)
+ validateTriggerAssignment(documents, thirdTrigger, worker0)
+ validateTriggerAssignment(documents, fourthTrigger, worker1)
+ })
}
it should "balance the load accross workers when a worker is added" in
withAssetCleaner(wskprops) {
@@ -151,14 +173,18 @@ class MessageHubMultiWorkersTest extends FlatSpec
createTrigger(assetHelper, fifthTrigger, updatedParameters)
createTrigger(assetHelper, sixthTrigger, updatedParameters)
- val documents =
getAllDocs(dbName).fields("rows").convertTo[List[JsObject]]
-
- validateTriggerAssignment(documents, firstTrigger, worker0)
- validateTriggerAssignment(documents, secondTrigger, worker0)
- validateTriggerAssignment(documents, thirdTrigger, worker1)
- validateTriggerAssignment(documents, fourthTrigger, worker1)
- validateTriggerAssignment(documents, fifthTrigger, worker0)
- validateTriggerAssignment(documents, sixthTrigger, worker1)
+ retry({
+ val result = Await.result(client.getAllDocs(includeDocs = Some(true)),
15.seconds)
+ result should be('right)
+ val documents =
result.right.get.fields("rows").convertTo[List[JsObject]]
+
+ validateTriggerAssignment(documents, firstTrigger, worker0)
+ validateTriggerAssignment(documents, secondTrigger, worker0)
+ validateTriggerAssignment(documents, thirdTrigger, worker1)
+ validateTriggerAssignment(documents, fourthTrigger, worker1)
+ validateTriggerAssignment(documents, fifthTrigger, worker0)
+ validateTriggerAssignment(documents, sixthTrigger, worker1)
+ })
}
def createTrigger(assetHelper: AssetCleaner, name: String, parameters:
Map[String, spray.json.JsValue]) = {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services