This is an automated email from the ASF dual-hosted git repository. japetrsn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push: new 1b28f89 Ensure test consumers exists before producing messages (#292) 1b28f89 is described below commit 1b28f89556c9356ebf9043bdaf78f1dec90f6370 Author: James Dubee <jwdu...@us.ibm.com> AuthorDate: Wed Nov 7 10:22:46 2018 -0500 Ensure test consumers exists before producing messages (#292) * Ensure test consumers exists before producing messages * Refactoring * More refactoring --- .../test/scala/system/health/BasicHealthTest.scala | 135 +++-------------- .../scala/system/packages/KafkaProduceTests.scala | 7 +- .../system/packages/MessageHubFeedTests.scala | 163 ++++++++------------- .../packages/MessageHubMultiWorkersTest.scala | 31 +--- .../system/packages/MessageHubProduceTests.scala | 65 +++----- .../src/test/scala/system/stress/StressTest.scala | 15 +- tests/src/test/scala/system/utils/KafkaUtils.scala | 81 +++++++++- 7 files changed, 185 insertions(+), 312 deletions(-) diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala index 2b99938..949bce6 100644 --- a/tests/src/test/scala/system/health/BasicHealthTest.scala +++ b/tests/src/test/scala/system/health/BasicHealthTest.scala @@ -19,7 +19,6 @@ package system.health import java.util.concurrent.{TimeUnit, TimeoutException} -import com.jayway.restassured.RestAssured import common.TestUtils.NOT_FOUND import common._ import org.apache.kafka.clients.producer.ProducerRecord @@ -43,7 +42,8 @@ class BasicHealthTest with TestHelpers with WskTestHelpers with Inside - with JsHelpers { + with JsHelpers + with KafkaUtils { val topic = "test" val sessionTimeout = 10 seconds @@ -55,125 +55,24 @@ class BasicHealthTest val messageHubFeed = "messageHubFeed" val messageHubProduce = "messageHubProduce" val actionName = s"$messagingPackage/$messageHubFeed" - - val consumerInitTime = 10000 // ms - - val kafkaUtils = new KafkaUtils - val maxRetries = System.getProperty("max.retries", "60").toInt behavior of "Message Hub feed" - it should "create a new trigger" in withAssetCleaner(wskprops) { - val triggerName = s"newTrigger-${System.currentTimeMillis}" - println(s"Creating trigger $triggerName") - - (wp, assetHelper) => - val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) { - (trigger, _) => - trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), - "api_key" -> kafkaUtils.getAsJson("api_key"), - "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"), - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), - "topic" -> topic.toJson - )) - } - - withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) { - activation => - // should be successful - activation.response.success shouldBe true - - // It takes a moment for the consumer to fully initialize. - println("Giving the consumer a moment to get ready") - Thread.sleep(consumerInitTime) - - val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "") - - println("Checking health endpoint(s) for existence of consumer uuid") - // get /health endpoint(s) and ensure it contains the new uuid - val healthUrls = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty) - healthUrls shouldNot be(empty) - - retry({ - val uuids = healthUrls.flatMap(u => { - val response = RestAssured.given().get(u) - response.statusCode() should be(200) - response.asString() - .parseJson - .asJsObject - .getFields("consumers") - .head - .convertTo[JsArray] - .elements - .flatMap(c => { - c.asJsObject.fields.keySet - }) - }).toList - - uuids should contain(uuid) - - }, N = 10, waitBeforeRetry = Some(1.second)) - } - } - - it should "fire a trigger when a message is posted to message hub" in withAssetCleaner(wskprops) { + it should "create a consumer and fire a trigger when a message is posted to messagehub" in withAssetCleaner(wskprops) { val currentTime = s"${System.currentTimeMillis}" (wp, assetHelper) => val triggerName = s"/_/dummyMessageHubTrigger-$currentTime" - println(s"Creating trigger $triggerName") - - val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) { - (trigger, _) => - trigger.create(triggerName, feed = Some(actionName), parameters = Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), - "api_key" -> kafkaUtils.getAsJson("api_key"), - "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"), - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), - "topic" -> topic.toJson - )) - } - - withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) { - activation => - // should be successful - activation.response.success shouldBe true - // It takes a moment for the consumer to fully initialize. - println("Giving the consumer a moment to get ready") - Thread.sleep(consumerInitTime) - - val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "") - - println("Checking health endpoint(s) for existence of consumer uuid") - // get /health endpoint(s) and ensure it contains the new uuid - val healthUrls = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty) - healthUrls shouldNot be(empty) - - retry({ - val uuids = healthUrls.flatMap(u => { - val response = RestAssured.given().get(u) - response.statusCode() should be(200) - response.asString() - .parseJson - .asJsObject - .getFields("consumers") - .head - .convertTo[JsArray] - .elements - .flatMap(c => { - c.asJsObject.fields.keySet - }) - }).toList - - uuids should contain(uuid) - - }, N = 10, waitBeforeRetry = Some(1.second)) - } + createTrigger(assetHelper, triggerName, parameters = Map( + "user" -> getAsJson("user"), + "password" -> getAsJson("password"), + "api_key" -> getAsJson("api_key"), + "kafka_admin_url" -> getAsJson("kafka_admin_url"), + "kafka_brokers_sasl" -> getAsJson("brokers"), + "topic" -> topic.toJson + )) // This action creates a trigger if it gets executed. // The name of the trigger will be the message, that has been send to kafka. @@ -201,7 +100,7 @@ class BasicHealthTest } println(s"Producing message with key: $key and value: $verificationName") - val producer = kafkaUtils.createProducer() + val producer = createProducer() val record = new ProducerRecord(topic, key, verificationName) val future = producer.send(record) @@ -231,17 +130,17 @@ class BasicHealthTest val triggerName = s"/_/dummyMessageHubTrigger-$currentTime" println(s"Creating trigger $triggerName") - val username = kafkaUtils.getAsJson("user") - val password = kafkaUtils.getAsJson("password") - val admin_url = kafkaUtils.getAsJson("kafka_admin_url") - val brokers = kafkaUtils.getAsJson("brokers") + val username = getAsJson("user") + val password = getAsJson("password") + val admin_url = getAsJson("kafka_admin_url") + val brokers = getAsJson("brokers") val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) { (trigger, _) => trigger.create(triggerName, feed = Some(actionName), parameters = Map( "user" -> username, "password" -> password, - "api_key" -> kafkaUtils.getAsJson("api_key"), + "api_key" -> getAsJson("api_key"), "kafka_admin_url" -> admin_url, "kafka_brokers_sasl" -> brokers, "topic" -> topic.toJson, diff --git a/tests/src/test/scala/system/packages/KafkaProduceTests.scala b/tests/src/test/scala/system/packages/KafkaProduceTests.scala index 85d5007..e3d9094 100644 --- a/tests/src/test/scala/system/packages/KafkaProduceTests.scala +++ b/tests/src/test/scala/system/packages/KafkaProduceTests.scala @@ -45,7 +45,8 @@ class KafkaProduceTests with BeforeAndAfterAll with TestHelpers with WskTestHelpers - with JsHelpers { + with JsHelpers + with KafkaUtils { val topic = "test" val sessionTimeout = 10 seconds @@ -56,8 +57,6 @@ class KafkaProduceTests val actionName = "kafkaProduceAction" val actionFile = "../action/kafkaProduce.py" - val kafkaUtils = new KafkaUtils - behavior of "Kafka Produce action" override def beforeAll() { @@ -73,7 +72,7 @@ class KafkaProduceTests def testMissingParameter(missingParam : String) = { var fullParamsMap = Map( "topic" -> topic.toJson, - "brokers" -> kafkaUtils.getAsJson("brokers"), + "brokers" -> getAsJson("brokers"), "value" -> "This will fail".toJson) var missingParamsMap = fullParamsMap.filterKeys(_ != missingParam) diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala index 04ed5c9..4263a1d 100644 --- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala +++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala @@ -36,7 +36,6 @@ import common.WskActorSystem import common.WskProps import common.WskTestHelpers import ActionHelper._ - import common.TestUtils.NOT_FOUND import whisk.utils.retry @@ -49,19 +48,15 @@ class MessageHubFeedTests with BeforeAndAfterAll with TestHelpers with WskTestHelpers - with JsHelpers { + with JsHelpers + with KafkaUtils { val topic = "test" val sessionTimeout = 10 seconds - val messagingPackage = "/whisk.system/messaging" val messageHubFeed = "messageHubFeed" val messageHubProduce = "messageHubProduce" - val consumerInitTime = 10000 // ms - - val kafkaUtils = new KafkaUtils - val maxRetries = System.getProperty("max.retries", "60").toInt implicit val wskprops = WskProps() @@ -126,14 +121,13 @@ class MessageHubFeedTests (wp, assetHelper) => val triggerName = s"/_/dummyMessageHubTrigger-$currentTime" - println(s"Creating trigger ${triggerName}") createTrigger(assetHelper, triggerName, parameters = Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), - "api_key" -> kafkaUtils.getAsJson("api_key"), - "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"), - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), + "user" -> getAsJson("user"), + "password" -> getAsJson("password"), + "api_key" -> getAsJson("api_key"), + "kafka_admin_url" -> getAsJson("kafka_admin_url"), + "kafka_brokers_sasl" -> getAsJson("brokers"), "topic" -> topic.toJson, "isBinaryKey" -> false.toJson, "isBinaryValue" -> false.toJson)) @@ -158,20 +152,15 @@ class MessageHubFeedTests trigger.get(name, NOT_FOUND) } - // It takes a moment for the consumer to fully initialize. - println("Giving the consumer a moment to get ready") - Thread.sleep(consumerInitTime) - // Rapidly produce two messages whose size are each greater than half the allowed payload limit. // This should ensure that the feed fires these as two separate triggers. println("Rapidly producing two large messages") - val producer = kafkaUtils.createProducer() + val producer = createProducer() val firstMessage = new ProducerRecord(topic, verificationName1, generateMessage(s"first${currentTime}", testPayloadSize)) val secondMessage = new ProducerRecord(topic, verificationName2, generateMessage(s"second${currentTime}", testPayloadSize)) producer.send(firstMessage) producer.send(secondMessage) producer.close() - retry(wsk.trigger.get(verificationName1), 60, Some(1.second)) retry(wsk.trigger.get(verificationName2), 60, Some(1.second)) } @@ -185,14 +174,13 @@ class MessageHubFeedTests (wp, assetHelper) => val triggerName = s"/_/dummyMessageHubTrigger-$currentTime" - println(s"Creating trigger ${triggerName}") createTrigger(assetHelper, triggerName, parameters = Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), - "api_key" -> kafkaUtils.getAsJson("api_key"), - "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"), - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), + "user" -> getAsJson("user"), + "password" -> getAsJson("password"), + "api_key" -> getAsJson("api_key"), + "kafka_admin_url" -> getAsJson("kafka_admin_url"), + "kafka_brokers_sasl" -> getAsJson("brokers"), "topic" -> topic.toJson, "isBinaryKey" -> false.toJson, "isBinaryValue" -> false.toJson)) @@ -211,12 +199,8 @@ class MessageHubFeedTests wsk.trigger.get(verificationName, NOT_FOUND) - // It takes a moment for the consumer to fully initialize. - println("Giving the consumer a moment to get ready") - Thread.sleep(consumerInitTime) - println("Producing an oversized message") - val producer = kafkaUtils.createProducer() + val producer = createProducer() val bigMessage = new ProducerRecord(topic, verificationName, generateMessage(s"${currentTime}", testPayloadSize)) producer.send(bigMessage) producer.close() @@ -229,17 +213,15 @@ class MessageHubFeedTests (wp, assetHelper) => val triggerName = s"/_/dummyMessageHubTrigger-$currentTime" - println(s"Creating trigger ${triggerName}") - - val username = kafkaUtils.getAsJson("user") - val password = kafkaUtils.getAsJson("password") - val admin_url = kafkaUtils.getAsJson("kafka_admin_url") - val brokers = kafkaUtils.getAsJson("brokers") + val username = getAsJson("user") + val password = getAsJson("password") + val admin_url = getAsJson("kafka_admin_url") + val brokers = getAsJson("brokers") createTrigger(assetHelper, triggerName, parameters = Map( "user" -> username, "password" -> password, - "api_key" -> kafkaUtils.getAsJson("api_key"), + "api_key" -> getAsJson("api_key"), "kafka_admin_url" -> admin_url, "kafka_brokers_sasl" -> brokers, "topic" -> topic.toJson, @@ -263,17 +245,15 @@ class MessageHubFeedTests (wp, assetHelper) => val triggerName = s"/_/dummyMessageHubTrigger-$currentTime" - println(s"Creating trigger $triggerName") - - val username = kafkaUtils.getAsJson("user") - val password = kafkaUtils.getAsJson("password") - val admin_url = kafkaUtils.getAsJson("kafka_admin_url") - val brokers = kafkaUtils.getAsJson("brokers") + val username = getAsJson("user") + val password = getAsJson("password") + val admin_url = getAsJson("kafka_admin_url") + val brokers = getAsJson("brokers") createTrigger(assetHelper, triggerName, parameters = Map( "user" -> username, "password" -> password, - "api_key" -> kafkaUtils.getAsJson("api_key"), + "api_key" -> getAsJson("api_key"), "kafka_admin_url" -> admin_url, "kafka_brokers_sasl" -> brokers, "topic" -> topic.toJson, @@ -299,31 +279,22 @@ class MessageHubFeedTests (wp, assetHelper) => val triggerName = s"/_/dummyMessageHubTrigger-$currentTime" - println(s"Creating trigger $triggerName") - - val username = kafkaUtils.getAsJson("user") - val password = kafkaUtils.getAsJson("password") - val admin_url = kafkaUtils.getAsJson("kafka_admin_url") - val brokers = kafkaUtils.getAsJson("brokers") - - val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) { - (trigger, _) => - trigger.create(triggerName, feed = Some(actionName), parameters = Map( - "user" -> username, - "password" -> password, - "api_key" -> kafkaUtils.getAsJson("api_key"), - "kafka_admin_url" -> admin_url, - "kafka_brokers_sasl" -> brokers, - "topic" -> topic.toJson, - "isJSONData" -> true.toJson, - "isBinaryKey" -> false.toJson, - "isBinaryValue" -> false.toJson - )) - } + val username = getAsJson("user") + val password = getAsJson("password") + val admin_url = getAsJson("kafka_admin_url") + val brokers = getAsJson("brokers") - withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) { - _.response.success shouldBe true - } + createTrigger(assetHelper, triggerName, parameters = Map( + "user" -> username, + "password" -> password, + "api_key" -> getAsJson("api_key"), + "kafka_admin_url" -> admin_url, + "kafka_brokers_sasl" -> brokers, + "topic" -> topic.toJson, + "isJSONData" -> true.toJson, + "isBinaryKey" -> false.toJson, + "isBinaryValue" -> false.toJson + )) val readRunResult = wsk.action.invoke(actionName, parameters = Map( "triggerName" -> triggerName.toJson, @@ -384,14 +355,13 @@ class MessageHubFeedTests (wp, assetHelper) => val key = "TheKey" val triggerName = s"/_/dummyMessageHubTrigger-$currentTime" - println(s"Creating trigger $triggerName") createTrigger(assetHelper, triggerName, parameters = Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), - "api_key" -> kafkaUtils.getAsJson("api_key"), - "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"), - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), + "user" -> getAsJson("user"), + "password" -> getAsJson("password"), + "api_key" -> getAsJson("api_key"), + "kafka_admin_url" -> getAsJson("kafka_admin_url"), + "kafka_brokers_sasl" -> getAsJson("brokers"), "topic" -> topic.toJson )) @@ -411,14 +381,11 @@ class MessageHubFeedTests trigger.get(name, NOT_FOUND) } - println("Giving the consumer a moment to get ready") - Thread.sleep(consumerInitTime) - println("Producing a message") withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), + "user" -> getAsJson("user"), + "password" -> getAsJson("password"), + "kafka_brokers_sasl" -> getAsJson("brokers"), "topic" -> topic.toJson, "key" -> key.toJson, "value" -> verificationName1.toJson @@ -455,9 +422,9 @@ class MessageHubFeedTests println("Producing a message") withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), + "user" -> getAsJson("user"), + "password" -> getAsJson("password"), + "kafka_brokers_sasl" -> getAsJson("brokers"), "topic" -> topic.toJson, "key" -> key.toJson, "value" -> verificationName2.toJson @@ -474,16 +441,15 @@ class MessageHubFeedTests (wp, assetHelper) => val key = "TheKey" val triggerName = s"/_/dummyMessageHubTrigger-$currentTime" - println(s"Creating trigger $triggerName") createTrigger(assetHelper, triggerName, parameters = Map( "__bx_creds" -> Map( "messagehub" -> Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), - "api_key" -> kafkaUtils.getAsJson("api_key"), - "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"), - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"))).toJson, + "user" -> getAsJson("user"), + "password" -> getAsJson("password"), + "api_key" -> getAsJson("api_key"), + "kafka_admin_url" -> getAsJson("kafka_admin_url"), + "kafka_brokers_sasl" -> getAsJson("brokers"))).toJson, "topic" -> topic.toJson )) @@ -508,9 +474,9 @@ class MessageHubFeedTests println("Producing a message") withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), + "user" -> getAsJson("user"), + "password" -> getAsJson("password"), + "kafka_brokers_sasl" -> getAsJson("brokers"), "topic" -> topic.toJson, "key" -> key.toJson, "value" -> verificationName1.toJson @@ -521,19 +487,6 @@ class MessageHubFeedTests retry(wsk.trigger.get(verificationName1), 60, Some(1.second)) } - def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = { - val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) { - (trigger, _) => - trigger.create(name, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = parameters) - } - - withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) { - activation => - // should be successful - activation.response.success shouldBe true - } - } - def generateMessage(prefix: String, size: Int): String = { val longString = Array.fill[String](size)("0").mkString("") s"${prefix}${longString}" diff --git a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala index 4170ab2..baedd0c 100644 --- a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala +++ b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala @@ -19,7 +19,6 @@ package system.packages import system.utils.KafkaUtils import scala.concurrent.duration.DurationInt -import scala.language.postfixOps import org.junit.runner.RunWith import org.scalatest.BeforeAndAfterAll import org.scalatest.FlatSpec @@ -49,7 +48,8 @@ class MessageHubMultiWorkersTest extends FlatSpec with TestHelpers with WskTestHelpers with JsHelpers - with StreamLogging { + with StreamLogging + with KafkaUtils { val topic = "test" @@ -58,7 +58,6 @@ class MessageHubMultiWorkersTest extends FlatSpec val messagingPackage = "/whisk.system/messaging" val messageHubFeed = "messageHubFeed" - val dbProtocol = WhiskProperties.getProperty("db.protocol") val dbHost = WhiskProperties.getProperty("db.host") val dbPort = WhiskProperties.getProperty("db.port").toInt @@ -66,11 +65,8 @@ class MessageHubMultiWorkersTest extends FlatSpec val dbPassword = WhiskProperties.getProperty("db.password") val dbPrefix = WhiskProperties.getProperty(WhiskConfig.dbPrefix) val dbName = s"${dbPrefix}ow_kafka_triggers" - val client = new ExtendedCouchDbRestClient(dbProtocol, dbHost, dbPort, dbUsername, dbPassword, dbName) - val kafkaUtils = new KafkaUtils - behavior of "Mussage Hub Feed" ignore should "assign two triggers to same worker when only worker0 is available" in withAssetCleaner(wskprops) { @@ -187,26 +183,13 @@ class MessageHubMultiWorkersTest extends FlatSpec }) } - def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = { - val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) { - (trigger, _) => - trigger.create(name, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = parameters) - } - - withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) { - activation => - // should be successful - activation.response.success shouldBe true - } - } - def constructParams(workers: List[String]) = { Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), - "api_key" -> kafkaUtils.getAsJson("api_key"), - "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"), - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), + "user" -> getAsJson("user"), + "password" -> getAsJson("password"), + "api_key" -> getAsJson("api_key"), + "kafka_admin_url" -> getAsJson("kafka_admin_url"), + "kafka_brokers_sasl" -> getAsJson("brokers"), "topic" -> topic.toJson, "workers" -> workers.toJson ) diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala index 5cddefb..4ba4064 100644 --- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala +++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala @@ -52,7 +52,8 @@ class MessageHubProduceTests with BeforeAndAfterAll with TestHelpers with WskTestHelpers - with JsHelpers { + with JsHelpers + with KafkaUtils { val topic = "test" val sessionTimeout = 10 seconds @@ -63,19 +64,15 @@ class MessageHubProduceTests val messagingPackage = "/whisk.system/messaging" val messageHubFeed = "messageHubFeed" val messageHubProduce = "messageHubProduce" - val consumerInitTime = 10000 // ms - - val kafkaUtils = new KafkaUtils - val maxRetries = System.getProperty("max.retries", "60").toInt // these parameter values are 100% valid and should work as-is val validParameters = Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), + "user" -> getAsJson("user"), + "password" -> getAsJson("password"), "topic" -> topic.toJson, - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), + "kafka_brokers_sasl" -> getAsJson("brokers"), "value" -> "Big Trouble is actually a really good Tim Allen movie. Seriously.".toJson) behavior of "Message Hub Produce action" @@ -135,26 +132,14 @@ class MessageHubProduceTests (wp, assetHelper) => val triggerName = s"/_/binaryValueTrigger-$currentTime" - println(s"Creating trigger ${triggerName}") - - val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) { - (trigger, _) => - trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), - "api_key" -> kafkaUtils.getAsJson("api_key"), - "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"), - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), - "topic" -> topic.toJson)) - } - withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) { - _.response.success shouldBe true - } - - // It takes a moment for the consumer to fully initialize. - println("Giving the consumer a moment to get ready") - Thread.sleep(consumerInitTime) + createTrigger(assetHelper, triggerName, parameters = Map( + "user" -> getAsJson("user"), + "password" -> getAsJson("password"), + "api_key" -> getAsJson("api_key"), + "kafka_admin_url" -> getAsJson("kafka_admin_url"), + "kafka_brokers_sasl" -> getAsJson("brokers"), + "topic" -> topic.toJson)) val defaultAction = Some("dat/createTriggerActions.js") val defaultActionName = s"helloKafka-${currentTime}" @@ -191,26 +176,14 @@ class MessageHubProduceTests (wp, assetHelper) => val triggerName = s"/_/binaryKeyTrigger-$currentTime" - println(s"Creating trigger ${triggerName}") - - val feedCreationResult = assetHelper.withCleaner(wsk.trigger, triggerName) { - (trigger, _) => - trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), - "api_key" -> kafkaUtils.getAsJson("api_key"), - "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"), - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), - "topic" -> topic.toJson)) - } - - withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) { - _.response.success shouldBe true - } - // It takes a moment for the consumer to fully initialize. - println("Giving the consumer a moment to get ready") - Thread.sleep(consumerInitTime) + createTrigger(assetHelper, triggerName, parameters = Map( + "user" -> getAsJson("user"), + "password" -> getAsJson("password"), + "api_key" -> getAsJson("api_key"), + "kafka_admin_url" -> getAsJson("kafka_admin_url"), + "kafka_brokers_sasl" -> getAsJson("brokers"), + "topic" -> topic.toJson)) val defaultAction = Some("dat/createTriggerActionsFromKey.js") val defaultActionName = s"helloKafka-${currentTime}" diff --git a/tests/src/test/scala/system/stress/StressTest.scala b/tests/src/test/scala/system/stress/StressTest.scala index b174323..bd413e0 100644 --- a/tests/src/test/scala/system/stress/StressTest.scala +++ b/tests/src/test/scala/system/stress/StressTest.scala @@ -42,7 +42,8 @@ class BasicStressTest with Matchers with WskActorSystem with TestHelpers - with WskTestHelpers { + with WskTestHelpers + with KafkaUtils { val topic = "test" val sessionTimeout = 10 seconds @@ -54,8 +55,6 @@ class BasicStressTest val messageHubFeed = "messageHubFeed" val messageHubProduce = "messageHubProduce" - val kafkaUtils = new KafkaUtils - behavior of "Message Hub provider" it should "rapidly create and delete many triggers" in { @@ -80,11 +79,11 @@ class BasicStressTest val triggerName = s"/_/dummyMessageHubTrigger-$currentTime" println(s"\nCreating trigger #${iterationLabel}: ${triggerName}") val feedCreationResult = wsk.trigger.create(triggerName, feed = Some(s"$messagingPackage/$messageHubFeed"), parameters = Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), - "api_key" -> kafkaUtils.getAsJson("api_key"), - "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"), - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), + "user" -> getAsJson("user"), + "password" -> getAsJson("password"), + "api_key" -> getAsJson("api_key"), + "kafka_admin_url" -> getAsJson("kafka_admin_url"), + "kafka_brokers_sasl" -> getAsJson("brokers"), "topic" -> topic.toJson)) println("Waiting for trigger create") diff --git a/tests/src/test/scala/system/utils/KafkaUtils.scala b/tests/src/test/scala/system/utils/KafkaUtils.scala index be17d6c..e01c715 100644 --- a/tests/src/test/scala/system/utils/KafkaUtils.scala +++ b/tests/src/test/scala/system/utils/KafkaUtils.scala @@ -17,24 +17,29 @@ package system.utils -import common.TestUtils - import java.util.HashMap import java.util.Properties + +import com.jayway.restassured.RestAssured +import com.jayway.restassured.config.{RestAssuredConfig, SSLConfig} import javax.security.auth.login.Configuration import javax.security.auth.login.AppConfigurationEntry - -import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.KafkaProducer import scala.collection.mutable.ListBuffer - import spray.json.DefaultJsonProtocol._ import spray.json._ - +import system.packages.ActionHelper._ import whisk.utils.JsHelpers +import scala.concurrent.duration.DurationInt +import scala.language.postfixOps +import common.TestHelpers +import common.TestUtils +import common.WskTestHelpers +import whisk.utils.retry -class KafkaUtils { +trait KafkaUtils extends TestHelpers with WskTestHelpers { lazy val messageHubProps = KafkaUtils.initializeMessageHub() def createProducer() : KafkaProducer[String, String] = { @@ -52,9 +57,71 @@ class KafkaUtils { case key => this(key).asInstanceOf[String].toJson } } + + val sslconfig = { + val inner = new SSLConfig().allowAllHostnames() + val config = inner.relaxedHTTPSValidation() + new RestAssuredConfig().sslConfig(config) + } + + def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = { + println(s"Creating trigger $name") + + val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) { + (trigger, _) => + trigger.create(name, feed = Some(s"/whisk.system/messaging/messageHubFeed"), parameters = parameters) + } + + withActivation(wsk.activation, feedCreationResult, initialWait = 5 seconds, totalWait = 60 seconds) { + activation => + // should be successful + activation.response.success shouldBe true + + // It takes a moment for the consumer to fully initialize. + println("Giving the consumer a moment to get ready") + Thread.sleep(KafkaUtils.consumerInitTime) + + val uuid = activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"", "") + consumerExists(uuid) + } + } + + + def consumerExists(uuid: String) = { + println("Checking health endpoint(s) for existence of consumer uuid") + // get /health endpoint(s) and ensure it contains the new uuid + val healthUrls: Array[String] = System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty) + assert(healthUrls.size != 0) + + retry({ + val uuids: Array[(String, JsValue)] = healthUrls.flatMap(u => { + val response = RestAssured.given().config(sslconfig).get(u) + assert(response.statusCode() == 200) + + response.asString() + .parseJson + .asJsObject + .getFields("consumers") + .head + .convertTo[JsArray] + .elements + .flatMap(c => { + val consumer = c.asJsObject.fields.head + consumer match { + case (u, v) if u == uuid && v.asJsObject.getFields("currentState").head == "Running".toJson => Some(consumer) + case _ => None + } + }) + }) + + assert(uuids.nonEmpty) + }, N = 60, waitBeforeRetry = Some(1.second)) + } } object KafkaUtils { + val consumerInitTime = 10000 // ms + def asKafkaProducerProps(props : Map[String,Object]) : Properties = { val requiredKeys = List("brokers", "user",