csantanapr closed pull request #253: Resiliency Updates for MessageHub Tests
URL: https://github.com/apache/incubator-openwhisk-package-kafka/pull/253
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala 
b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 02183d2..139ed21 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -141,9 +141,7 @@ class BasicHealthTest
       }
 
       withActivation(wsk.activation, feedCreationResult, initialWait = 5 
seconds, totalWait = 60 seconds) {
-        activation =>
-          // should be successful
-          activation.response.success shouldBe true
+        _.response.success shouldBe true
       }
 
       // It takes a moment for the consumer to fully initialize.
@@ -280,9 +278,7 @@ class BasicHealthTest
       }
 
       withActivation(wsk.activation, feedCreationResult, initialWait = 5 
seconds, totalWait = 60 seconds) {
-        activation =>
-          // should be successful
-          activation.response.success shouldBe true
+        _.response.success shouldBe true
       }
 
       val readRunResult = wsk.action.invoke(actionName, parameters = Map(
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala 
b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 1fe18a1..e280e39 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -17,18 +17,21 @@
 package system.packages
 
 import system.utils.KafkaUtils
-import org.apache.kafka.clients.producer.ProducerRecord
 
 import scala.concurrent.duration.DurationInt
 import scala.language.postfixOps
+
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.Inside
 import org.scalatest.junit.JUnitRunner
+import org.apache.kafka.clients.producer.ProducerRecord
+
 import spray.json.DefaultJsonProtocol._
 import spray.json._
+
 import common.JsHelpers
 import common.TestUtils
 import common.TestHelpers
@@ -36,11 +39,14 @@ import common.Wsk
 import common.WskActorSystem
 import common.WskProps
 import common.WskTestHelpers
+
 import ActionHelper._
+
 import java.util.Base64
 import java.nio.charset.StandardCharsets
 import java.time.{Clock, Instant}
 
+import whisk.utils.retry
 
 @RunWith(classOf[JUnitRunner])
 class MessageHubFeedTests
@@ -166,28 +172,30 @@ class MessageHubFeedTests
         _.response.success shouldBe true
       }
 
-      println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 1, Some(triggerName), 
retries = maxRetries)
-      assert(activations.length == 1)
+      retry({
+        println("Polling for activations")
+        val activations = wsk.activation.pollFor(N = 1, Some(triggerName), 
retries = maxRetries)
+        assert(activations.length == 1)
 
-      val matchingActivations = for {
-        id <- activations
-        activation = wsk.activation.waitForActivation(id)
-        if (activation.isRight && 
activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
-      } yield activation.right.get
+        val matchingActivations = for {
+          id <- activations
+          activation = wsk.activation.waitForActivation(id)
+          if (activation.isRight && 
activation.right.get.fields.get("response").toString.contains(encodedCurrentTime))
+        } yield activation.right.get
 
-      assert(matchingActivations.length == 1)
+        assert(matchingActivations.length > 0)
 
-      val activation = matchingActivations.head
-      activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+        val activation = matchingActivations.head
+        activation.getFieldPath("response", "success") shouldBe 
Some(true.toJson)
 
-      // assert that there exists a message in the activation which has the 
expected keys and values
-      val messages = KafkaUtils.messagesInActivation(activation, field = 
"value", value = encodedCurrentTime)
-      assert(messages.length == 1)
+        // assert that there exists a message in the activation which has the 
expected keys and values
+        val messages = KafkaUtils.messagesInActivation(activation, field = 
"value", value = encodedCurrentTime)
+        assert(messages.length == 1)
 
-      val message = messages.head
-      message.getFieldPath("topic") shouldBe Some(topic.toJson)
-      message.getFieldPath("key") shouldBe Some(encodedKey.toJson)
+        val message = messages.head
+        message.getFieldPath("topic") shouldBe Some(topic.toJson)
+        message.getFieldPath("key") shouldBe Some(encodedKey.toJson)
+      }, N = 3)
   }
 
   it should "not fire a single trigger with an oversized payload" in 
withAssetCleaner(wskprops) {
@@ -234,19 +242,21 @@ class MessageHubFeedTests
       producer.send(secondMessage)
       producer.close()
 
-      // verify there are two trigger activations required to handle these 
messages
-      println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 2, Some(triggerName), 
retries = maxRetries)
-
-      println("Verifying activation content")
-      val matchingActivations = for {
-        id <- activations
-        activation = wsk.activation.waitForActivation(id)
-        if (activation.isRight && 
(activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")
 ||
-          
activation.right.get.fields.get("response").toString.contains(s"second${currentTime}")))
-      } yield activation.right.get
-
-      assert(matchingActivations.length == 2)
+      retry({
+        // verify there are two trigger activations required to handle these 
messages
+        println("Polling for activations")
+        val activations = wsk.activation.pollFor(N = 2, Some(triggerName), 
retries = maxRetries)
+
+        println("Verifying activation content")
+        val matchingActivations = for {
+          id <- activations
+          activation = wsk.activation.waitForActivation(id)
+          if (activation.isRight && 
(activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")
 ||
+            
activation.right.get.fields.get("response").toString.contains(s"second${currentTime}")))
+        } yield activation.right.get
+
+        assert(matchingActivations.length == 2)
+      }, N = 3)
   }
 
   it should "not fire a trigger for a single oversized message" in 
withAssetCleaner(wskprops) {
@@ -289,18 +299,20 @@ class MessageHubFeedTests
       producer.send(bigMessage)
       producer.close()
 
-      // verify there are no activations that match
-      println("Polling for activations")
-      val activations = wsk.activation.pollFor(N = 1, Some(triggerName), 
retries = maxRetries)
+      retry({
+        // verify there are no activations that match
+        println("Polling for activations")
+        val activations = wsk.activation.pollFor(N = 1, Some(triggerName), 
retries = maxRetries)
 
-      println("Verifying activation content")
-      val matchingActivations = for {
-        id <- activations
-        activation = wsk.activation.waitForActivation(id)
-        if (activation.isRight && 
(activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")))
-      } yield activation.right.get
+        println("Verifying activation content")
+        val matchingActivations = for {
+          id <- activations
+          activation = wsk.activation.waitForActivation(id)
+          if (activation.isRight && 
(activation.right.get.fields.get("response").toString.contains(s"first${currentTime}")))
+        } yield activation.right.get
 
-      assert(matchingActivations.length == 0)
+        assert(matchingActivations.length == 0)
+      }, N = 3)
   }
 
   it should "reject trigger update without passing in any updatable 
parameters" in withAssetCleaner(wskprops) {
@@ -333,8 +345,7 @@ class MessageHubFeedTests
       ))
 
       withActivation(wsk.activation, run) {
-        activation =>
-          activation.response.success shouldBe false
+        _.response.success shouldBe false
       }
   }
 
@@ -370,8 +381,7 @@ class MessageHubFeedTests
       ))
 
       withActivation(wsk.activation, run) {
-        activation =>
-          activation.response.success shouldBe false
+        _.response.success shouldBe false
       }
   }
 
@@ -430,8 +440,7 @@ class MessageHubFeedTests
       ))
 
       withActivation(wsk.activation, updateRunResult) {
-        activation =>
-          activation.response.success shouldBe true
+        _.response.success shouldBe true
       }
 
       println("Giving the consumer a moment to get ready")
@@ -469,29 +478,31 @@ class MessageHubFeedTests
   }
 
   def checkForActivations(triggerName: String, since: Instant, topic: String, 
key: String, value: String) = {
-    println("Polling for activations")
-    val activations = wsk.activation.pollFor(N = 1, Some(triggerName), since = 
Some(since), retries = maxRetries)
-    assert(activations.length == 1)
+    retry({
+      println("Polling for activations")
+      val activations = wsk.activation.pollFor(N = 1, Some(triggerName), since 
= Some(since), retries = maxRetries)
+      assert(activations.length == 1)
 
-    println("Validating content of activation(s)")
-    val matchingActivations = for {
-      id <- activations
-      activation = wsk.activation.waitForActivation(id)
-      if (activation.isRight && 
activation.right.get.fields.get("response").toString.contains(value))
-    } yield activation.right.get
+      println("Validating content of activation(s)")
+      val matchingActivations = for {
+        id <- activations
+        activation = wsk.activation.waitForActivation(id)
+        if (activation.isRight && 
activation.right.get.fields.get("response").toString.contains(value))
+      } yield activation.right.get
 
-    assert(matchingActivations.length == 1)
+      assert(matchingActivations.length > 0)
 
-    val activation = matchingActivations.head
-    activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
+      val activation = matchingActivations.head
+      activation.getFieldPath("response", "success") shouldBe Some(true.toJson)
 
-    // assert that there exists a message in the activation which has the 
expected keys and values
-    val messages = KafkaUtils.messagesInActivation(activation, field = 
"value", value = value)
-    assert(messages.length == 1)
+      // assert that there exists a message in the activation which has the 
expected keys and values
+      val messages = KafkaUtils.messagesInActivation(activation, field = 
"value", value = value)
+      assert(messages.length == 1)
 
-    val message = messages.head
-    message.getFieldPath("topic") shouldBe Some(topic.toJson)
-    message.getFieldPath("key") shouldBe Some(key.toJson)
+      val message = messages.head
+      message.getFieldPath("topic") shouldBe Some(topic.toJson)
+      message.getFieldPath("key") shouldBe Some(key.toJson)
+    }, N = 3)
   }
 
   def generateMessage(prefix: String, size: Int): String = {
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala 
b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 3c2aac8..4d2b38c 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -35,12 +35,15 @@ import common.Wsk
 import common.WskActorSystem
 import common.WskProps
 import common.WskTestHelpers
+
 import spray.json.DefaultJsonProtocol._
 import spray.json.pimpAny
 
 import java.util.Base64
 import java.nio.charset.StandardCharsets
 
+import whisk.utils.retry
+
 @RunWith(classOf[JUnitRunner])
 class MessageHubProduceTests
     extends FlatSpec
@@ -150,7 +153,6 @@ class MessageHubProduceTests
     }
 
     it should "Post a message with a binary value" in 
withAssetCleaner(wskprops) {
-        // create trigger
         val currentTime = s"${System.currentTimeMillis}"
 
         (wp, assetHelper) =>
@@ -169,53 +171,53 @@ class MessageHubProduceTests
             }
 
             withActivation(wsk.activation, feedCreationResult, initialWait = 5 
seconds, totalWait = 60 seconds) {
-                activation =>
-                    // should be successful
-                    activation.response.success shouldBe true
+                _.response.success shouldBe true
             }
 
+            // It takes a moment for the consumer to fully initialize.
+            println("Giving the consumer a moment to get ready")
+            Thread.sleep(consumerInitTime)
+
             val defaultActionName = s"helloKafka-${currentTime}"
 
             assetHelper.withCleaner(wsk.action, defaultActionName) { (action, 
name) =>
                 action.create(name, defaultAction)
             }
-            assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
+
+            assetHelper.withCleaner(wsk.rule, 
s"dummyMessageHub-helloKafka-$currentTime") { (rule, name) =>
                 rule.create(name, trigger = triggerName, action = 
defaultActionName)
             }
 
-            // It takes a moment for the consumer to fully initialize.
-            println("Giving the consumer a moment to get ready")
-            Thread.sleep(consumerInitTime)
-
             // produce message
             val decodedMessage = "This will be base64 encoded"
             val encodedMessage = 
Base64.getEncoder.encodeToString(decodedMessage.getBytes(StandardCharsets.UTF_8))
             val base64ValueParams = validParameters + ("base64DecodeValue" -> 
true.toJson) + ("value" -> encodedMessage.toJson)
 
+            println("Producing a message")
             withActivation(wsk.activation, 
wsk.action.invoke(s"$messagingPackage/$messageHubProduce", base64ValueParams)) {
-                activation =>
-                    activation.response.success shouldBe true
+                _.response.success shouldBe true
             }
 
-            // verify trigger fired
-            println("Polling for activations")
-            val activations = wsk.activation.pollFor(N = 1, Some(triggerName), 
retries = maxRetries)
-            assert(activations.length > 0)
+            retry({
+                println("Polling for activations")
+                val activations = wsk.activation.pollFor(N = 1, 
Some(triggerName), retries = maxRetries)
+                assert(activations.nonEmpty)
 
-            val matchingActivations = for {
-                id <- activations
-                activation = wsk.activation.waitForActivation(id)
-                if (activation.isRight && 
activation.right.get.fields.get("response").toString.contains(decodedMessage))
-            } yield activation.right.get
+                val matchingActivations = for {
+                    id <- activations
+                    activation = wsk.activation.waitForActivation(id)
+                    if (activation.isRight && 
activation.right.get.fields.get("response").toString.contains(decodedMessage))
+                } yield activation.right.get
 
-            assert(matchingActivations.length == 1)
+                assert(matchingActivations.length > 0)
 
-            val activation = matchingActivations.head
-            activation.getFieldPath("response", "success") shouldBe 
Some(true.toJson)
+                val activation = matchingActivations.head
+                activation.getFieldPath("response", "success") shouldBe 
Some(true.toJson)
 
-            // assert that there exists a message in the activation which has 
the expected keys and values
-            val messages = KafkaUtils.messagesInActivation(activation, field = 
"value", value = decodedMessage)
-            assert(messages.length == 1)
+                // assert that there exists a message in the activation which 
has the expected keys and values
+                val messages = KafkaUtils.messagesInActivation(activation, 
field = "value", value = decodedMessage)
+                assert(messages.length == 1)
+            }, N = 3)
     }
 
     it should "Post a message with a binary key" in withAssetCleaner(wskprops) 
{
@@ -238,52 +240,52 @@ class MessageHubProduceTests
             }
 
             withActivation(wsk.activation, feedCreationResult, initialWait = 5 
seconds, totalWait = 60 seconds) {
-                activation =>
-                    // should be successful
-                    activation.response.success shouldBe true
+                _.response.success shouldBe true
             }
 
+            // It takes a moment for the consumer to fully initialize.
+            println("Giving the consumer a moment to get ready")
+            Thread.sleep(consumerInitTime)
+
             val defaultActionName = s"helloKafka-${currentTime}"
 
             assetHelper.withCleaner(wsk.action, defaultActionName) { (action, 
name) =>
                 action.create(name, defaultAction)
             }
-            assetHelper.withCleaner(wsk.rule, "rule") { (rule, name) =>
+
+            assetHelper.withCleaner(wsk.rule, 
s"dummyMessageHub-helloKafka-$currentTime") { (rule, name) =>
                 rule.create(name, trigger = triggerName, action = 
defaultActionName)
             }
 
-            // It takes a moment for the consumer to fully initialize.
-            println("Giving the consumer a moment to get ready")
-            Thread.sleep(consumerInitTime)
-
             // produce message
             val decodedKey = "This will be base64 encoded"
             val encodedKey = 
Base64.getEncoder.encodeToString(decodedKey.getBytes(StandardCharsets.UTF_8))
             val base64ValueParams = validParameters + ("base64DecodeKey" -> 
true.toJson) + ("key" -> encodedKey.toJson)
 
+            println("Producing a message")
             withActivation(wsk.activation, 
wsk.action.invoke(s"$messagingPackage/$messageHubProduce", base64ValueParams)) {
-                activation =>
-                    activation.response.success shouldBe true
+                _.response.success shouldBe true
             }
 
-            // verify trigger fired
-            println("Polling for activations")
-            val activations = wsk.activation.pollFor(N = 1, Some(triggerName), 
retries = maxRetries)
-            assert(activations.length > 0)
+            retry({
+                println("Polling for activations")
+                val activations = wsk.activation.pollFor(N = 1, 
Some(triggerName), retries = maxRetries)
+                assert(activations.nonEmpty)
 
-            val matchingActivations = for {
-                id <- activations
-                activation = wsk.activation.waitForActivation(id)
-                if (activation.isRight && 
activation.right.get.fields.get("response").toString.contains(decodedKey))
-            } yield activation.right.get
+                val matchingActivations = for {
+                    id <- activations
+                    activation = wsk.activation.waitForActivation(id)
+                    if (activation.isRight && 
activation.right.get.fields.get("response").toString.contains(decodedKey))
+                } yield activation.right.get
 
-            assert(matchingActivations.length == 1)
+                assert(matchingActivations.length > 0)
 
-            val activation = matchingActivations.head
-            activation.getFieldPath("response", "success") shouldBe 
Some(true.toJson)
+                val activation = matchingActivations.head
+                activation.getFieldPath("response", "success") shouldBe 
Some(true.toJson)
 
-            // assert that there exists a message in the activation which has 
the expected keys and values
-            val messages = KafkaUtils.messagesInActivation(activation, field = 
"key", value = decodedKey)
-            assert(messages.length == 1)
+                // assert that there exists a message in the activation which 
has the expected keys and values
+                val messages = KafkaUtils.messagesInActivation(activation, 
field = "key", value = decodedKey)
+                assert(messages.length == 1)
+            }, N = 3)
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to