This is an automated email from the ASF dual-hosted git repository.

japetrsn pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new c9448f0  Use KafkaProducer Instead of messageHubProduce (#304)
c9448f0 is described below

commit c9448f02f5c352c5481fb7be0128bea6f211f2d9
Author: James Dubee <jwdu...@us.ibm.com>
AuthorDate: Fri Nov 9 18:54:13 2018 -0500

    Use KafkaProducer Instead of messageHubProduce (#304)
    
    * Use KafkaProducer Instead of messageHubProduce
    
    * Add Build Tasks
---
 tests/build.gradle                                 | 10 ++++-
 .../test/scala/system/health/BasicHealthTest.scala |  8 +---
 .../system/packages/MessageHubFeedTests.scala      | 48 ++--------------------
 tests/src/test/scala/system/utils/KafkaUtils.scala | 10 +++++
 4 files changed, 23 insertions(+), 53 deletions(-)

diff --git a/tests/build.gradle b/tests/build.gradle
index b195863..a66ebcb 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -45,10 +45,16 @@ task testHealth(type: Test) {
   include 'system/health/**'
 }
 
-task testNoHealth(type: Test) {
+task testWithProducer(type: Test) {
+  configure commonConfiguration
+  exclude 'system/packages/MessageHubProduceTests.class'
+  exclude 'system/packages/MessagingServiceTests.class'
+  exclude 'system/stress/**'
+}
+
+task testWithProducerAndProducerAction(type: Test) {
   configure commonConfiguration
   exclude 'system/stress/**'
-  exclude 'system/health/**'
   exclude 'system/packages/MessagingServiceTests.class'
 }
 
diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala 
b/tests/src/test/scala/system/health/BasicHealthTest.scala
index c83f146..7b90679 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -99,13 +99,7 @@ class BasicHealthTest
         trigger.get(name, NOT_FOUND)
       }
 
-      println(s"Producing message with key: $key and value: $verificationName")
-      val producer = createProducer()
-      val record = new ProducerRecord(topic, key, verificationName)
-      val future = producer.send(record)
-
-      producer.flush()
-      producer.close()
+      produceMessage(topic, key, verificationName)
 
       try {
         val result = future.get(60, TimeUnit.SECONDS)
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala 
b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index a847aae..d691125 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -55,7 +55,6 @@ class MessageHubFeedTests
   val sessionTimeout = 10 seconds
   val messagingPackage = "/whisk.system/messaging"
   val messageHubFeed = "messageHubFeed"
-  val messageHubProduce = "messageHubProduce"
   val consumerInitTime = 10000 // ms
   val maxRetries = System.getProperty("max.retries", "60").toInt
 
@@ -198,12 +197,8 @@ class MessageHubFeedTests
       val verificationName = s"trigger-$currentTime"
 
       wsk.trigger.get(verificationName, NOT_FOUND)
-
       println("Producing an oversized message")
-      val producer = createProducer()
-      val bigMessage = new ProducerRecord(topic, verificationName, 
generateMessage(s"${currentTime}", testPayloadSize))
-      producer.send(bigMessage)
-      producer.close()
+      produceMessage(topic, verificationName, 
generateMessage(s"${currentTime}", testPayloadSize))
 
       a[Exception] should be thrownBy retry(wsk.trigger.get(verificationName), 
60, Some(1.second))
   }
@@ -381,18 +376,7 @@ class MessageHubFeedTests
         trigger.get(name, NOT_FOUND)
       }
 
-      println("Producing a message")
-      withActivation(wsk.activation, 
wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
-        "user" -> getAsJson("user"),
-        "password" -> getAsJson("password"),
-        "kafka_brokers_sasl" -> getAsJson("brokers"),
-        "topic" -> topic.toJson,
-        "key" -> key.toJson,
-        "value" -> verificationName1.toJson
-      ))) {
-        _.response.success shouldBe true
-      }
-
+      produceMessage(topic, key, verificationName1)
       retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
 
       println("Updating trigger")
@@ -419,19 +403,7 @@ class MessageHubFeedTests
 
       println("Giving the consumer a moment to get ready")
       Thread.sleep(consumerInitTime)
-
-      println("Producing a message")
-      withActivation(wsk.activation, 
wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
-        "user" -> getAsJson("user"),
-        "password" -> getAsJson("password"),
-        "kafka_brokers_sasl" -> getAsJson("brokers"),
-        "topic" -> topic.toJson,
-        "key" -> key.toJson,
-        "value" -> verificationName2.toJson
-      ))) {
-        _.response.success shouldBe true
-      }
-
+      produceMessage(topic, key, verificationName2)
       retry(wsk.trigger.get(verificationName2), 60, Some(1.second))
   }
 
@@ -471,19 +443,7 @@ class MessageHubFeedTests
 
       println("Giving the consumer a moment to get ready")
       Thread.sleep(consumerInitTime)
-
-      println("Producing a message")
-      withActivation(wsk.activation, 
wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
-        "user" -> getAsJson("user"),
-        "password" -> getAsJson("password"),
-        "kafka_brokers_sasl" -> getAsJson("brokers"),
-        "topic" -> topic.toJson,
-        "key" -> key.toJson,
-        "value" -> verificationName1.toJson
-      ))) {
-        _.response.success shouldBe true
-      }
-
+      produceMessage(topic, key, verificationName1)
       retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
   }
 
diff --git a/tests/src/test/scala/system/utils/KafkaUtils.scala 
b/tests/src/test/scala/system/utils/KafkaUtils.scala
index dae3dcf..62361a6 100644
--- a/tests/src/test/scala/system/utils/KafkaUtils.scala
+++ b/tests/src/test/scala/system/utils/KafkaUtils.scala
@@ -117,6 +117,16 @@ trait KafkaUtils extends TestHelpers with WskTestHelpers {
             assert(uuids.nonEmpty)
         }, N = 60, waitBeforeRetry = Some(1.second))
     }
+
+    def produceMessage(topic: String, key: String, value: String) = {
+        println(s"Producing message with key: $key and value: $value")
+        val producer = createProducer()
+        val record = new ProducerRecord(topic, key, value)
+        val future = producer.send(record)
+
+        producer.flush()
+        producer.close()
+    }
 }
 
 object KafkaUtils {

Reply via email to