This is an automated email from the ASF dual-hosted git repository. japetrsn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push: new dac4c12 Update MessageHub parameter validation (#293) dac4c12 is described below commit dac4c120e169879c06da29a16c1c68041bd15eb5 Author: James Dubee <jwdu...@us.ibm.com> AuthorDate: Mon Oct 29 23:28:29 2018 -0400 Update MessageHub parameter validation (#293) * Update MessageHub parameter validation * Review refactor --- action/messageHubFeedWeb.js | 22 ++++++++- .../system/packages/MessageHubFeedTests.scala | 53 ++++++++++++++++++++++ 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/action/messageHubFeedWeb.js b/action/messageHubFeedWeb.js index 66114a6..435d8fd 100644 --- a/action/messageHubFeedWeb.js +++ b/action/messageHubFeedWeb.js @@ -179,6 +179,26 @@ function validateParameters(rawParams) { } } + validatedParams.isMessageHub = true; + + return validateMessageHubParameters(rawParams.__bx_creds && rawParams.__bx_creds.messagehub ? rawParams.__bx_creds.messagehub : rawParams) + .then(p => { + validatedParams = Object.assign(validatedParams, p) + resolve(validatedParams) + }) + .catch(error => { + reject(error); + return; + }) + }); + + return promise; +} + +function validateMessageHubParameters(rawParams) { + var promise = new Promise((resolve, reject) => { + var validatedParams = {}; + // kafka_brokers_sasl if (rawParams.kafka_brokers_sasl) { validatedParams.brokers = common.validateBrokerParam(rawParams.kafka_brokers_sasl); @@ -215,8 +235,6 @@ function validateParameters(rawParams) { return; } - validatedParams.isMessageHub = true; - resolve(validatedParams); }); diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala index d970d4b..04ed5c9 100644 --- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala +++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala @@ -468,6 +468,59 @@ class MessageHubFeedTests retry(wsk.trigger.get(verificationName2), 60, Some(1.second)) } + it should "create a trigger with __bx_creds and fire a trigger when a message is posted to message hub" in withAssetCleaner(wskprops) { + val currentTime = s"${System.currentTimeMillis}" + + (wp, assetHelper) => + val key = "TheKey" + val triggerName = s"/_/dummyMessageHubTrigger-$currentTime" + println(s"Creating trigger $triggerName") + + createTrigger(assetHelper, triggerName, parameters = Map( + "__bx_creds" -> Map( + "messagehub" -> Map( + "user" -> kafkaUtils.getAsJson("user"), + "password" -> kafkaUtils.getAsJson("password"), + "api_key" -> kafkaUtils.getAsJson("api_key"), + "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"), + "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"))).toJson, + "topic" -> topic.toJson + )) + + val defaultAction1 = Some("dat/createTriggerActions.js") + val defaultActionName = s"helloKafka-${currentTime}" + + assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) => + action.create(name, defaultAction1) + } + assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) => + rule.create(name, trigger = triggerName, action = defaultActionName) + } + + val verificationName1 = s"trigger1-$currentTime" + + assetHelper.withCleaner(wsk.trigger, verificationName1) { (trigger, name) => + trigger.get(name, NOT_FOUND) + } + + println("Giving the consumer a moment to get ready") + Thread.sleep(consumerInitTime) + + println("Producing a message") + withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map( + "user" -> kafkaUtils.getAsJson("user"), + "password" -> kafkaUtils.getAsJson("password"), + "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), + "topic" -> topic.toJson, + "key" -> key.toJson, + "value" -> verificationName1.toJson + ))) { + _.response.success shouldBe true + } + + retry(wsk.trigger.get(verificationName1), 60, Some(1.second)) + } + def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = { val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) { (trigger, _) =>