jasonpet closed pull request #306: Update Tests to Wait for Producer to Finish URL: https://github.com/apache/incubator-openwhisk-package-kafka/pull/306
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala index 7b90679..ae8c788 100644 --- a/tests/src/test/scala/system/health/BasicHealthTest.scala +++ b/tests/src/test/scala/system/health/BasicHealthTest.scala @@ -17,11 +17,8 @@ package system.health -import java.util.concurrent.{TimeUnit, TimeoutException} - import common.TestUtils.NOT_FOUND import common._ -import org.apache.kafka.clients.producer.ProducerRecord import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, FlatSpec, Inside, Matchers} @@ -101,16 +98,6 @@ class BasicHealthTest produceMessage(topic, key, verificationName) - try { - val result = future.get(60, TimeUnit.SECONDS) - - println(s"Produced message to topic: ${result.topic()} on partition: ${result.partition()} at offset: ${result.offset()} with timestamp: ${result.timestamp()}.") - } catch { - case e: TimeoutException => - fail(s"TimeoutException received waiting for message to be produced to topic: $topic with key: $key and value: $value. ${e.getMessage}") - case e: Exception => throw e - } - // Check if the trigger, that should have been created as reaction on the kafka-message, has been created. // The trigger should have been created by the action, that has been triggered by the kafka message. // If we cannot find it, the most probably the action did not run. diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala index d691125..941f846 100644 --- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala +++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala @@ -38,6 +38,7 @@ import common.WskTestHelpers import ActionHelper._ import common.TestUtils.NOT_FOUND import org.apache.openwhisk.utils.retry +import java.util.concurrent.ExecutionException @RunWith(classOf[JUnitRunner]) class MessageHubFeedTests @@ -197,9 +198,9 @@ class MessageHubFeedTests val verificationName = s"trigger-$currentTime" wsk.trigger.get(verificationName, NOT_FOUND) - println("Producing an oversized message") - produceMessage(topic, verificationName, generateMessage(s"${currentTime}", testPayloadSize)) + // The producer will generate an error as the payload size is too large for the MessageHub brokers + a[ExecutionException] should be thrownBy produceMessage(topic, verificationName, generateMessage(s"${currentTime}", testPayloadSize)) a[Exception] should be thrownBy retry(wsk.trigger.get(verificationName), 60, Some(1.second)) } diff --git a/tests/src/test/scala/system/utils/KafkaUtils.scala b/tests/src/test/scala/system/utils/KafkaUtils.scala index 62361a6..19345b3 100644 --- a/tests/src/test/scala/system/utils/KafkaUtils.scala +++ b/tests/src/test/scala/system/utils/KafkaUtils.scala @@ -19,6 +19,7 @@ package system.utils import java.util.HashMap import java.util.Properties +import java.util.concurrent.{TimeUnit, TimeoutException} import com.jayway.restassured.RestAssured import com.jayway.restassured.config.{RestAssuredConfig, SSLConfig} @@ -38,6 +39,7 @@ import common.TestHelpers import common.TestUtils import common.WskTestHelpers import org.apache.openwhisk.utils.retry +import org.apache.kafka.clients.producer.ProducerRecord trait KafkaUtils extends TestHelpers with WskTestHelpers { lazy val messageHubProps = KafkaUtils.initializeMessageHub() @@ -126,6 +128,16 @@ trait KafkaUtils extends TestHelpers with WskTestHelpers { producer.flush() producer.close() + + try { + val result = future.get(60, TimeUnit.SECONDS) + + println(s"Produced message to topic: ${result.topic()} on partition: ${result.partition()} at offset: ${result.offset()} with timestamp: ${result.timestamp()}.") + } catch { + case e: TimeoutException => + fail(s"TimeoutException received waiting for message to be produced to topic: $topic with key: $key and value: $value. ${e.getMessage}") + case e: Exception => throw e + } } } @@ -138,7 +150,8 @@ object KafkaUtils { "password", "key.serializer", "value.serializer", - "security.protocol") + "security.protocol", + "max.request.size") val propertyMap = props.filterKeys( requiredKeys.contains(_) @@ -176,7 +189,7 @@ object KafkaUtils { val security_protocol = ("security.protocol", "SASL_SSL"); val keySerializer = ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); val valueSerializer = ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - + val maxRequestSize = ("max.request.size", "3000000"); var brokerList = new ListBuffer[String]() val jsonArray = credentials.get("kafka_brokers_sasl").getAsJsonArray() val brokerIterator = jsonArray.iterator() @@ -190,7 +203,7 @@ object KafkaUtils { System.setProperty("java.security.auth.login.config", "") setMessageHubSecurityConfiguration(user._2, password._2) - Map(user, password, kafka_admin_url, api_key, brokers, security_protocol, keySerializer, valueSerializer) + Map(user, password, kafka_admin_url, api_key, brokers, security_protocol, keySerializer, valueSerializer, maxRequestSize) } private def setMessageHubSecurityConfiguration(user: String, password: String) = { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services