jasonpet closed pull request #293: Update MessageHub parameter validation
URL: https://github.com/apache/incubator-openwhisk-package-kafka/pull/293
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/action/messageHubFeedWeb.js b/action/messageHubFeedWeb.js
index 76baae7..8f8d81b 100644
--- a/action/messageHubFeedWeb.js
+++ b/action/messageHubFeedWeb.js
@@ -175,6 +175,26 @@ function validateParameters(rawParams) {
             }
         }
 
+        validatedParams.isMessageHub = true;
+
+        return validateMessageHubParameters(rawParams.__bx_creds && 
rawParams.__bx_creds.messagehub ? rawParams.__bx_creds.messagehub : rawParams)
+        .then(p => {
+            validatedParams = Object.assign(validatedParams, p)
+            resolve(validatedParams)
+        })
+        .catch(error => {
+            reject(error);
+            return;
+        })
+    });
+
+    return promise;
+}
+
+function validateMessageHubParameters(rawParams) {
+    var promise = new Promise((resolve, reject) => {
+        var validatedParams = {};
+
         // kafka_brokers_sasl
         if (rawParams.kafka_brokers_sasl) {
             validatedParams.brokers = 
common.validateBrokerParam(rawParams.kafka_brokers_sasl);
@@ -211,8 +231,6 @@ function validateParameters(rawParams) {
             return;
         }
 
-        validatedParams.isMessageHub = true;
-
         resolve(validatedParams);
     });
 
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala 
b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index d970d4b..04ed5c9 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -468,6 +468,59 @@ class MessageHubFeedTests
       retry(wsk.trigger.get(verificationName2), 60, Some(1.second))
   }
 
+  it should "create a trigger with __bx_creds and fire a trigger when a 
message is posted to message hub" in withAssetCleaner(wskprops) {
+    val currentTime = s"${System.currentTimeMillis}"
+
+    (wp, assetHelper) =>
+      val key = "TheKey"
+      val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
+      println(s"Creating trigger $triggerName")
+
+      createTrigger(assetHelper, triggerName, parameters = Map(
+        "__bx_creds" -> Map(
+          "messagehub" -> Map(
+            "user" -> kafkaUtils.getAsJson("user"),
+            "password" -> kafkaUtils.getAsJson("password"),
+            "api_key" -> kafkaUtils.getAsJson("api_key"),
+            "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
+            "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"))).toJson,
+        "topic" -> topic.toJson
+      ))
+
+      val defaultAction1 = Some("dat/createTriggerActions.js")
+      val defaultActionName = s"helloKafka-${currentTime}"
+
+      assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) 
=>
+        action.create(name, defaultAction1)
+      }
+      assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
+        rule.create(name, trigger = triggerName, action = defaultActionName)
+      }
+
+      val verificationName1 = s"trigger1-$currentTime"
+
+      assetHelper.withCleaner(wsk.trigger, verificationName1) { (trigger, 
name) =>
+        trigger.get(name, NOT_FOUND)
+      }
+
+      println("Giving the consumer a moment to get ready")
+      Thread.sleep(consumerInitTime)
+
+      println("Producing a message")
+      withActivation(wsk.activation, 
wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
+        "user" -> kafkaUtils.getAsJson("user"),
+        "password" -> kafkaUtils.getAsJson("password"),
+        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "topic" -> topic.toJson,
+        "key" -> key.toJson,
+        "value" -> verificationName1.toJson
+      ))) {
+        _.response.success shouldBe true
+      }
+
+      retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
+  }
+
   def createTrigger(assetHelper: AssetCleaner, name: String, parameters: 
Map[String, spray.json.JsValue]) = {
     val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
       (trigger, _) =>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to