This is an automated email from the ASF dual-hosted git repository. dubeejw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push: new 88aa2ef Remove use of messageHubProduce action in health test and use java producer (#264) 88aa2ef is described below commit 88aa2ef4825f1a0e2a26db79b1edd5d4753c834c Author: Adnan Baruni <abar...@users.noreply.github.com> AuthorDate: Tue Apr 17 12:52:36 2018 -0500 Remove use of messageHubProduce action in health test and use java producer (#264) --- .../test/scala/system/health/BasicHealthTest.scala | 27 ++++++++++++++-------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala index c80458c..799b4db 100644 --- a/tests/src/test/scala/system/health/BasicHealthTest.scala +++ b/tests/src/test/scala/system/health/BasicHealthTest.scala @@ -17,6 +17,8 @@ package system.health +import java.util.concurrent.{TimeUnit, TimeoutException} + import system.utils.KafkaUtils import scala.concurrent.duration.DurationInt @@ -34,6 +36,7 @@ import common.WskTestHelpers import spray.json._ import spray.json.DefaultJsonProtocol._ import com.jayway.restassured.RestAssured +import org.apache.kafka.clients.producer.ProducerRecord import whisk.utils.retry; @RunWith(classOf[JUnitRunner]) @@ -163,15 +166,21 @@ class BasicHealthTest val key = "TheKey" println("Producing a message") - withActivation(wsk.activation, wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map( - "user" -> kafkaUtils.getAsJson("user"), - "password" -> kafkaUtils.getAsJson("password"), - "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"), - "topic" -> topic.toJson, - "key" -> key.toJson, - "value" -> currentTime.toJson - ))) { - _.response.success shouldBe true + val producer = kafkaUtils.createProducer() + val record = new ProducerRecord(topic, key, currentTime) + val future = producer.send(record) + + producer.flush() + producer.close() + + try { + val result = future.get(60, TimeUnit.SECONDS) + + println(s"Produced record to topic: ${result.topic()} on partition: ${result.partition()} at offset: ${result.offset()} with key: $key and value: $currentTime.") + } catch { + case e: TimeoutException => + fail(s"TimeoutException received waiting for message to be produced to topic: $topic with key: $key and value: $value. ${e.getMessage}") + case e: Exception => throw e } retry({ -- To stop receiving notification emails like this one, please contact dube...@apache.org.