jasonpet closed pull request #306: Update Tests to Wait for Producer to Finish
URL: https://github.com/apache/incubator-openwhisk-package-kafka/pull/306
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala 
b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 7b90679..ae8c788 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -17,11 +17,8 @@
 
 package system.health
 
-import java.util.concurrent.{TimeUnit, TimeoutException}
-
 import common.TestUtils.NOT_FOUND
 import common._
-import org.apache.kafka.clients.producer.ProducerRecord
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, FlatSpec, Inside, Matchers}
@@ -101,16 +98,6 @@ class BasicHealthTest
 
       produceMessage(topic, key, verificationName)
 
-      try {
-        val result = future.get(60, TimeUnit.SECONDS)
-
-        println(s"Produced message to topic: ${result.topic()} on partition: 
${result.partition()} at offset: ${result.offset()} with timestamp: 
${result.timestamp()}.")
-      } catch {
-        case e: TimeoutException =>
-          fail(s"TimeoutException received waiting for message to be produced 
to topic: $topic with key: $key and value: $value. ${e.getMessage}")
-        case e: Exception => throw e
-      }
-
       // Check if the trigger, that should have been created as reaction on 
the kafka-message, has been created.
       // The trigger should have been created by the action, that has been 
triggered by the kafka message.
       // If we cannot find it, the most probably the action did not run.
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala 
b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index d691125..941f846 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -38,6 +38,7 @@ import common.WskTestHelpers
 import ActionHelper._
 import common.TestUtils.NOT_FOUND
 import org.apache.openwhisk.utils.retry
+import java.util.concurrent.ExecutionException
 
 @RunWith(classOf[JUnitRunner])
 class MessageHubFeedTests
@@ -197,9 +198,9 @@ class MessageHubFeedTests
       val verificationName = s"trigger-$currentTime"
 
       wsk.trigger.get(verificationName, NOT_FOUND)
-      println("Producing an oversized message")
-      produceMessage(topic, verificationName, 
generateMessage(s"${currentTime}", testPayloadSize))
 
+      // The producer will generate an error as the payload size is too large 
for the MessageHub brokers
+      a[ExecutionException] should be thrownBy produceMessage(topic, 
verificationName, generateMessage(s"${currentTime}", testPayloadSize))
       a[Exception] should be thrownBy retry(wsk.trigger.get(verificationName), 
60, Some(1.second))
   }
 
diff --git a/tests/src/test/scala/system/utils/KafkaUtils.scala 
b/tests/src/test/scala/system/utils/KafkaUtils.scala
index 62361a6..19345b3 100644
--- a/tests/src/test/scala/system/utils/KafkaUtils.scala
+++ b/tests/src/test/scala/system/utils/KafkaUtils.scala
@@ -19,6 +19,7 @@ package system.utils
 
 import java.util.HashMap
 import java.util.Properties
+import java.util.concurrent.{TimeUnit, TimeoutException}
 
 import com.jayway.restassured.RestAssured
 import com.jayway.restassured.config.{RestAssuredConfig, SSLConfig}
@@ -38,6 +39,7 @@ import common.TestHelpers
 import common.TestUtils
 import common.WskTestHelpers
 import org.apache.openwhisk.utils.retry
+import org.apache.kafka.clients.producer.ProducerRecord
 
 trait KafkaUtils extends TestHelpers with WskTestHelpers {
     lazy val messageHubProps = KafkaUtils.initializeMessageHub()
@@ -126,6 +128,16 @@ trait KafkaUtils extends TestHelpers with WskTestHelpers {
 
         producer.flush()
         producer.close()
+
+        try {
+          val result = future.get(60, TimeUnit.SECONDS)
+
+          println(s"Produced message to topic: ${result.topic()} on partition: 
${result.partition()} at offset: ${result.offset()} with timestamp: 
${result.timestamp()}.")
+        } catch {
+          case e: TimeoutException =>
+            fail(s"TimeoutException received waiting for message to be 
produced to topic: $topic with key: $key and value: $value. ${e.getMessage}")
+          case e: Exception => throw e
+        }
     }
 }
 
@@ -138,7 +150,8 @@ object KafkaUtils {
                                 "password",
                                 "key.serializer",
                                 "value.serializer",
-                                "security.protocol")
+                                "security.protocol",
+                                "max.request.size")
 
         val propertyMap = props.filterKeys(
             requiredKeys.contains(_)
@@ -176,7 +189,7 @@ object KafkaUtils {
         val security_protocol = ("security.protocol", "SASL_SSL");
         val keySerializer = ("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
         val valueSerializer = ("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-
+        val maxRequestSize = ("max.request.size", "3000000");
         var brokerList = new ListBuffer[String]()
         val jsonArray = credentials.get("kafka_brokers_sasl").getAsJsonArray()
         val brokerIterator = jsonArray.iterator()
@@ -190,7 +203,7 @@ object KafkaUtils {
         System.setProperty("java.security.auth.login.config", "")
         setMessageHubSecurityConfiguration(user._2, password._2)
 
-        Map(user, password, kafka_admin_url, api_key, brokers, 
security_protocol, keySerializer, valueSerializer)
+        Map(user, password, kafka_admin_url, api_key, brokers, 
security_protocol, keySerializer, valueSerializer, maxRequestSize)
     }
 
     private def setMessageHubSecurityConfiguration(user: String, password: 
String) = {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to