This is an automated email from the ASF dual-hosted git repository.

japetrsn pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new dac4c12  Update MessageHub parameter validation (#293)
dac4c12 is described below

commit dac4c120e169879c06da29a16c1c68041bd15eb5
Author: James Dubee <jwdu...@us.ibm.com>
AuthorDate: Mon Oct 29 23:28:29 2018 -0400

    Update MessageHub parameter validation (#293)
    
    * Update MessageHub parameter validation
    
    * Review refactor
---
 action/messageHubFeedWeb.js                        | 22 ++++++++-
 .../system/packages/MessageHubFeedTests.scala      | 53 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 2 deletions(-)

diff --git a/action/messageHubFeedWeb.js b/action/messageHubFeedWeb.js
index 66114a6..435d8fd 100644
--- a/action/messageHubFeedWeb.js
+++ b/action/messageHubFeedWeb.js
@@ -179,6 +179,26 @@ function validateParameters(rawParams) {
             }
         }
 
+        validatedParams.isMessageHub = true;
+
+        return validateMessageHubParameters(rawParams.__bx_creds && 
rawParams.__bx_creds.messagehub ? rawParams.__bx_creds.messagehub : rawParams)
+        .then(p => {
+            validatedParams = Object.assign(validatedParams, p)
+            resolve(validatedParams)
+        })
+        .catch(error => {
+            reject(error);
+            return;
+        })
+    });
+
+    return promise;
+}
+
+function validateMessageHubParameters(rawParams) {
+    var promise = new Promise((resolve, reject) => {
+        var validatedParams = {};
+
         // kafka_brokers_sasl
         if (rawParams.kafka_brokers_sasl) {
             validatedParams.brokers = 
common.validateBrokerParam(rawParams.kafka_brokers_sasl);
@@ -215,8 +235,6 @@ function validateParameters(rawParams) {
             return;
         }
 
-        validatedParams.isMessageHub = true;
-
         resolve(validatedParams);
     });
 
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala 
b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index d970d4b..04ed5c9 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -468,6 +468,59 @@ class MessageHubFeedTests
       retry(wsk.trigger.get(verificationName2), 60, Some(1.second))
   }
 
+  it should "create a trigger with __bx_creds and fire a trigger when a 
message is posted to message hub" in withAssetCleaner(wskprops) {
+    val currentTime = s"${System.currentTimeMillis}"
+
+    (wp, assetHelper) =>
+      val key = "TheKey"
+      val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
+      println(s"Creating trigger $triggerName")
+
+      createTrigger(assetHelper, triggerName, parameters = Map(
+        "__bx_creds" -> Map(
+          "messagehub" -> Map(
+            "user" -> kafkaUtils.getAsJson("user"),
+            "password" -> kafkaUtils.getAsJson("password"),
+            "api_key" -> kafkaUtils.getAsJson("api_key"),
+            "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
+            "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"))).toJson,
+        "topic" -> topic.toJson
+      ))
+
+      val defaultAction1 = Some("dat/createTriggerActions.js")
+      val defaultActionName = s"helloKafka-${currentTime}"
+
+      assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) 
=>
+        action.create(name, defaultAction1)
+      }
+      assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
+        rule.create(name, trigger = triggerName, action = defaultActionName)
+      }
+
+      val verificationName1 = s"trigger1-$currentTime"
+
+      assetHelper.withCleaner(wsk.trigger, verificationName1) { (trigger, 
name) =>
+        trigger.get(name, NOT_FOUND)
+      }
+
+      println("Giving the consumer a moment to get ready")
+      Thread.sleep(consumerInitTime)
+
+      println("Producing a message")
+      withActivation(wsk.activation, 
wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
+        "user" -> kafkaUtils.getAsJson("user"),
+        "password" -> kafkaUtils.getAsJson("password"),
+        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "topic" -> topic.toJson,
+        "key" -> key.toJson,
+        "value" -> verificationName1.toJson
+      ))) {
+        _.response.success shouldBe true
+      }
+
+      retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
+  }
+
   def createTrigger(assetHelper: AssetCleaner, name: String, parameters: 
Map[String, spray.json.JsValue]) = {
     val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
       (trigger, _) =>

Reply via email to