This is an automated email from the ASF dual-hosted git repository.

csantanapr pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 1159e6b  Resiliency Updates for MessageHub Tests (#253)
1159e6b is described below

commit 1159e6b56c4ead50f460135b636d06eeadebc0a9
Author: James Dubee <jwdu...@us.ibm.com>
AuthorDate: Fri Mar 2 14:46:52 2018 -0500

    Resiliency Updates for MessageHub Tests (#253)
---
 .../test/scala/system/health/BasicHealthTest.scala |   8 +-
 .../system/packages/MessageHubFeedTests.scala      | 141 +++++++++++----------
 .../system/packages/MessageHubProduceTests.scala   | 104 +++++++--------
 3 files changed, 131 insertions(+), 122 deletions(-)

diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala 
b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 02183d2..139ed21 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -141,9 +141,7 @@ class BasicHealthTest
       }
 
       withActivation(wsk.activation, feedCreationResult, initialWait = 5 
seconds, totalWait = 60 seconds) {
-        activation =>
-          // should be successful
-          activation.response.success shouldBe true
+        _.response.success shouldBe true
       }
 
       // It takes a moment for the consumer to fully initialize.
@@ -280,9 +278,7 @@ class BasicHealthTest
       }
 
       withActivation(wsk.activation, feedCreationResult, initialWait = 5 
seconds, totalWait = 60 seconds) {
-        activation =>
-          // should be successful
-          activation.response.success shouldBe true
+        _.response.success shouldBe true
       }
 
       val readRunResult = wsk.action.invoke(actionName, parameters = Map(
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala 
b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 1fe18a1..e280e39 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -17,18 +17,21 @@
 package system.packages
 
 import system.utils.KafkaUtils
-import org.apache.kafka.clients.producer.ProducerRecord
 
 import scala.concurrent.duration.DurationInt
 import scala.language.postfixOps
+
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.Inside
 import org.scalatest.junit.JUnitRunner
+import org.apache.kafka.clients.producer.ProducerRecord
+
 import spray.json.DefaultJsonProtocol._
 import spray.json._
+
 import common.JsHelpers
 import common.TestUtils
 import common.TestHelpers
@@ -36,11 +39,14 @@ import common.Wsk
 import common.WskActorSystem
 import common.WskProps
 import common.WskTestHelpers
+
 import ActionHelper._
+
 import java.util.Base64
 import java.nio.charset.StandardCharsets
 import java.time.{Clock, Instant}
 
+import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
 class MessageHubFeedTests
@@ -166,28 +172,30 @@ class MessageHubFeedTests
         _.response.success shouldBe true
       }
 
-      println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 1, Some(triggerName), 
retries = maxRetries)
-      assert(activations.length == 1)
+      retry({
+        println("Polling for activations")
+        val activations = wsk.activation.pollFor(N = 1, Some(triggerName), 
retries = maxRetries)
+        assert(activations.length == 1)
 
-      val matchingActivations = for {
-        id <- activations
-        activation = wsk.activation.waitForActivation(id)
-        if (activation.isRight && 
activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
-      } yield activation.right.get
+        val matchingActivations = for {
+          id <- activations
+          activation = wsk.activation.waitForActivation(id)
+          if (activation.isRight && 
activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
+        } yield activation.right.get
 
-      assert(matchingActivations.length == 1)
+        assert(matchingActivations.length > 0)
 
-      val activation = matchingActivations.head
-      activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+        val activation = matchingActivations.head
+        activation.getFieldPath("response", "success") shouldBe 
Some(true.toJson)
 
-      // assert that there exists a message in the activation which has the 
expected keys and values
-      val messages = KafkaUtils.messagesInActivation(activation, field = 
"value", value = encodedCurrentTime)
-      assert(messages.length == 1)
+        // assert that there exists a message in the activation which has the 
expected keys and values
+        val messages = KafkaUtils.messagesInActivation(activation, field = 
"value", value = encodedCurrentTime)
+        assert(messages.length == 1)
 
-      val message = messages.head
-      message.getFieldPath("topic") shouldBe Some(topic.toJson)
-      message.getFieldPath("key") shouldBe Some(encodedKey.toJson)
+        val message = messages.head
+        message.getFieldPath("topic") shouldBe Some(topic.toJson)
+        message.getFieldPath("key") shouldBe Some(encodedKey.toJson)
+      }, N = 3)
   }
 
   it should "not fire a single trigger with an oversized payload" in 
withAssetCleaner(wskprops) {
@@ -234,19 +242,21 @@ class MessageHubFeedTests
       producer.send(secondMessage)
       producer.close()
 
-      // verify there are two trigger activations required to handle these 
messages
-      println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 2, Some(triggerName), 
retries = maxRetries)
-
-      println("Verifying activation content")
-      val matchingActivations = for {
-        id <- activations
-        activation = wsk.activation.waitForActivation(id)
-        if (activation.isRight && 
(activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")
 ||
-          
activation.right.get.fields.get("response").toString.contains(s"second${currentTime}")))
-      } yield activation.right.get
-
-      assert(matchingActivations.length == 2)
+      retry({
+        // verify there are two trigger activations required to handle these 
messages
+        println("Polling for activations")
+        val activations = wsk.activation.pollFor(N = 2, Some(triggerName), 
retries = maxRetries)
+
+        println("Verifying activation content")
+        val matchingActivations = for {
+          id <- activations
+          activation = wsk.activation.waitForActivation(id)
+          if (activation.isRight && 
(activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")
 ||
+            
activation.right.get.fields.get("response").toString.contains(s"second${currentTime}")))
+        } yield activation.right.get
+
+        assert(matchingActivations.length == 2)
+      }, N = 3)
   }
 
   it should "not fire a trigger for a single oversized message" in 
withAssetCleaner(wskprops) {
@@ -289,18 +299,20 @@ class MessageHubFeedTests
       producer.send(bigMessage)
       producer.close()
 
-      // verify there are no activations that match
-      println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 1, Some(triggerName), 
retries = maxRetries)
+      retry({
+        // verify there are no activations that match
+        println("Polling for activations")
+        val activations = wsk.activation.pollFor(N = 1, Some(triggerName), 
retries = maxRetries)
 
-      println("Verifying activation content")
-      val matchingActivations = for {
-        id <- activations
-        activation = wsk.activation.waitForActivation(id)
-        if (activation.isRight && 
(activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")))
-      } yield activation.right.get
+        println("Verifying activation content")
+        val matchingActivations = for {
+          id <- activations
+          activation = wsk.activation.waitForActivation(id)
+          if (activation.isRight && 
(activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")))
+        } yield activation.right.get
 
-      assert(matchingActivations.length == 0)
+        assert(matchingActivations.length == 0)
+      }, N = 3)
   }
 
   it should "reject trigger update without passing in any updatable 
parameters" in withAssetCleaner(wskprops) {
@@ -333,8 +345,7 @@ class MessageHubFeedTests
       ))
 
       withActivation(wsk.activation, run) {
-        activation =>
-          activation.response.success shouldBe false
+        _.response.success shouldBe false
       }
   }
 
@@ -370,8 +381,7 @@ class MessageHubFeedTests
       ))
 
       withActivation(wsk.activation, run) {
-        activation =>
-          activation.response.success shouldBe false
+        _.response.success shouldBe false
       }
   }
 
@@ -430,8 +440,7 @@ class MessageHubFeedTests
       ))
 
       withActivation(wsk.activation, updateRunResult) {
-        activation =>
-          activation.response.success shouldBe true
+        _.response.success shouldBe true
       }
 
       println("Giving the consumer a moment to get ready")
@@ -469,29 +478,31 @@ class MessageHubFeedTests
   }
 
   def checkForActivations(triggerName: String, since: Instant, topic: String, 
key: String, value: String) = {
-    println("Polling for activations")
-    val activations = wsk.activation.pollFor(N = 1, Some(triggerName), since = 
Some(since), retries = maxRetries)
-    assert(activations.length == 1)
+    retry({
+      println("Polling for activations")
+      val activations = wsk.activation.pollFor(N = 1, Some(triggerName), since 
= Some(since), retries = maxRetries)
+      assert(activations.length == 1)
 
-    println("Validating content of activation(s)")
-    val matchingActivations = for {
-      id <- activations
-      activation = wsk.activation.waitForActivation(id)
-      if (activation.isRight && 
activation.right.get.fields.get("response").toString.contains(value))
-    } yield activation.right.get
+      println("Validating content of activation(s)")
+      val matchingActivations = for {
+        id <- activations
+        activation = wsk.activation.waitForActivation(id)
+        if (activation.isRight && 
activation.right.get.fields.get("response").toString.contains(value))
+      } yield activation.right.get
 
-    assert(matchingActivations.length == 1)
+      assert(matchingActivations.length > 0)
 
-    val activation = matchingActivations.head
-    activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+      val activation = matchingActivations.head
+      activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
 
-    // assert that there exists a message in the activation which has the 
expected keys and values
-    val messages = KafkaUtils.messagesInActivation(activation, field = 
"value", value = value)
-    assert(messages.length == 1)
+      // assert that there exists a message in the activation which has the 
expected keys and values
+      val messages = KafkaUtils.messagesInActivation(activation, field = 
"value", value = value)
+      assert(messages.length == 1)
 
-    val message = messages.head
-    message.getFieldPath("topic") shouldBe Some(topic.toJson)
-    message.getFieldPath("key") shouldBe Some(key.toJson)
+      val message = messages.head
+      message.getFieldPath("topic") shouldBe Some(topic.toJson)
+      message.getFieldPath("key") shouldBe Some(key.toJson)
+    }, N = 3)
   }
 
   def generateMessage(prefix: String, size: Int): String = {
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala 
b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 3c2aac8..4d2b38c 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -35,12 +35,15 @@ import common.Wsk
 import common.WskActorSystem
 import common.WskProps
 import common.WskTestHelpers
+
 import spray.json.DefaultJsonProtocol._
 import spray.json.pimpAny
 
 import java.util.Base64
 import java.nio.charset.StandardCharsets
 
+import whisk.utils.retry
+
 @RunWith(classOf[JUnitRunner])
 class MessageHubProduceTests
     extends FlatSpec
@@ -150,7 +153,6 @@ class MessageHubProduceTests
     }
 
     it should "Post a message with a binary value" in 
withAssetCleaner(wskprops) {
-        // create trigger
         val currentTime = s"${System.currentTimeMillis}"
 
         (wp, assetHelper) =>
@@ -169,53 +171,53 @@ class MessageHubProduceTests
             }
 
             withActivation(wsk.activation, feedCreationResult, initialWait = 5 
seconds, totalWait = 60 seconds) {
-                activation =>
-                    // should be successful
-                    activation.response.success shouldBe true
+                _.response.success shouldBe true
             }
 
+            // It takes a moment for the consumer to fully initialize.
+            println("Giving the consumer a moment to get ready")
+            Thread.sleep(consumerInitTime)
+
             val defaultActionName = s"helloKafka-${currentTime}"
 
             assetHelper.withCleaner(wsk.action, defaultActionName) { (action, 
name) =>
                 action.create(name, defaultAction)
             }
-            assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
+
+            assetHelper.withCleaner(wsk.rule, 
s"dummyMessageHub-helloKafka-$currentTime") { (rule, name) =>
                 rule.create(name, trigger = triggerName, action = 
defaultActionName)
             }
 
-            // It takes a moment for the consumer to fully initialize.
-            println("Giving the consumer a moment to get ready")
-            Thread.sleep(consumerInitTime)
-
             // produce message
             val decodedMessage = "This will be base64 encoded"
             val encodedMessage = 
Base64.getEncoder.encodeToString(decodedMessage.getBytes(StandardCharsets.UTF_8))
             val base64ValueParams = validParameters + ("base64DecodeValue" -> 
true.toJson) + ("value" -> encodedMessage.toJson)
 
+            println("Producing a message")
             withActivation(wsk.activation, 
wsk.action.invoke(s"$messagingPackage/$messageHubProduce", base64ValueParams)) {
-                activation =>
-                    activation.response.success shouldBe true
+                _.response.success shouldBe true
             }
 
-            // verify trigger fired
-            println("Polling for activations")
-            val activations = wsk.activation.pollFor(N = 1, Some(triggerName), 
retries = maxRetries)
-            assert(activations.length > 0)
+            retry({
+                println("Polling for activations")
+                val activations = wsk.activation.pollFor(N = 1, 
Some(triggerName), retries = maxRetries)
+                assert(activations.nonEmpty)
 
-            val matchingActivations = for {
-                id <- activations
-                activation = wsk.activation.waitForActivation(id)
-                if (activation.isRight && 
activation.right.get.fields.get("response").toString.contains(decodedMessage))
-            } yield activation.right.get
+                val matchingActivations = for {
+                    id <- activations
+                    activation = wsk.activation.waitForActivation(id)
+                    if (activation.isRight && 
activation.right.get.fields.get("response").toString.contains(decodedMessage))
+                } yield activation.right.get
 
-            assert(matchingActivations.length == 1)
+                assert(matchingActivations.length > 0)
 
-            val activation = matchingActivations.head
-            activation.getFieldPath("response", "success") shouldBe 
Some(true.toJson)
+                val activation = matchingActivations.head
+                activation.getFieldPath("response", "success") shouldBe 
Some(true.toJson)
 
-            // assert that there exists a message in the activation which has 
the expected keys and values
-            val messages = KafkaUtils.messagesInActivation(activation, field = 
"value", value = decodedMessage)
-            assert(messages.length == 1)
+                // assert that there exists a message in the activation which 
has the expected keys and values
+                val messages = KafkaUtils.messagesInActivation(activation, 
field = "value", value = decodedMessage)
+                assert(messages.length == 1)
+            }, N = 3)
     }
 
     it should "Post a message with a binary key" in withAssetCleaner(wskprops) 
{
@@ -238,52 +240,52 @@ class MessageHubProduceTests
             }
 
             withActivation(wsk.activation, feedCreationResult, initialWait = 5 
seconds, totalWait = 60 seconds) {
-                activation =>
-                    // should be successful
-                    activation.response.success shouldBe true
+                _.response.success shouldBe true
             }
 
+            // It takes a moment for the consumer to fully initialize.
+            println("Giving the consumer a moment to get ready")
+            Thread.sleep(consumerInitTime)
+
             val defaultActionName = s"helloKafka-${currentTime}"
 
             assetHelper.withCleaner(wsk.action, defaultActionName) { (action, 
name) =>
                 action.create(name, defaultAction)
             }
-            assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
+
+            assetHelper.withCleaner(wsk.rule, 
s"dummyMessageHub-helloKafka-$currentTime") { (rule, name) =>
                 rule.create(name, trigger = triggerName, action = 
defaultActionName)
             }
 
-            // It takes a moment for the consumer to fully initialize.
-            println("Giving the consumer a moment to get ready")
-            Thread.sleep(consumerInitTime)
-
             // produce message
             val decodedKey = "This will be base64 encoded"
             val encodedKey = 
Base64.getEncoder.encodeToString(decodedKey.getBytes(StandardCharsets.UTF_8))
             val base64ValueParams = validParameters + ("base64DecodeKey" -> 
true.toJson) + ("key" -> encodedKey.toJson)
 
+            println("Producing a message")
             withActivation(wsk.activation, 
wsk.action.invoke(s"$messagingPackage/$messageHubProduce", base64ValueParams)) {
-                activation =>
-                    activation.response.success shouldBe true
+                _.response.success shouldBe true
             }
 
-            // verify trigger fired
-            println("Polling for activations")
-            val activations = wsk.activation.pollFor(N = 1, Some(triggerName), 
retries = maxRetries)
-            assert(activations.length > 0)
+            retry({
+                println("Polling for activations")
+                val activations = wsk.activation.pollFor(N = 1, 
Some(triggerName), retries = maxRetries)
+                assert(activations.nonEmpty)
 
-            val matchingActivations = for {
-                id <- activations
-                activation = wsk.activation.waitForActivation(id)
-                if (activation.isRight && 
activation.right.get.fields.get("response").toString.contains(decodedKey))
-            } yield activation.right.get
+                val matchingActivations = for {
+                    id <- activations
+                    activation = wsk.activation.waitForActivation(id)
+                    if (activation.isRight && 
activation.right.get.fields.get("response").toString.contains(decodedKey))
+                } yield activation.right.get
 
-            assert(matchingActivations.length == 1)
+                assert(matchingActivations.length > 0)
 
-            val activation = matchingActivations.head
-            activation.getFieldPath("response", "success") shouldBe 
Some(true.toJson)
+                val activation = matchingActivations.head
+                activation.getFieldPath("response", "success") shouldBe 
Some(true.toJson)
 
-            // assert that there exists a message in the activation which has 
the expected keys and values
-            val messages = KafkaUtils.messagesInActivation(activation, field = 
"key", value = decodedKey)
-            assert(messages.length == 1)
+                // assert that there exists a message in the activation which 
has the expected keys and values
+                val messages = KafkaUtils.messagesInActivation(activation, 
field = "key", value = decodedKey)
+                assert(messages.length == 1)
+            }, N = 3)
     }
 }

-- 
To stop receiving notification emails like this one, please contact
csantan...@apache.org.

Reply via email to