This is an automated email from the ASF dual-hosted git repository.
japetrsn pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new a13ef12 Verify trigger fire by sideeffect. (#272)
a13ef12 is described below
commit a13ef12ada9ddb5f18c10551939dcae74aab6894
Author: Christian Bickel <[email protected]>
AuthorDate: Thu Jun 28 19:22:53 2018 +0200
Verify trigger fire by sideeffect. (#272)
Until now, we verify the kafkaprovider, by looking if the action in
OpenWhisk has been executed with `activation list`. The problem with this call
is, that the activation may not be in the list in time, because it only returns
the result of an CouchDB view. If there is much load on the database, the
view-computation may be behind and not return the activation.
As side effect, we use a trigger creation. The name of the trigger will be
read from the kafka message. All other things (like credentials, ...) are
already present in the action, that is invoked anyway.
To check if the trigger exists, we use the ID of the trigger. So no view is
involved anymore.
---
tests/dat/createTriggerActions.js | 11 +++++
.../test/scala/system/health/BasicHealthTest.scala | 51 ++++++++++++----------
2 files changed, 39 insertions(+), 23 deletions(-)
diff --git a/tests/dat/createTriggerActions.js
b/tests/dat/createTriggerActions.js
new file mode 100644
index 0000000..58044ea
--- /dev/null
+++ b/tests/dat/createTriggerActions.js
@@ -0,0 +1,11 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
contributor
+// license agreements; and to You under the Apache License, Version 2.0.
+
+var openwhisk = require('openwhisk');
+
+function main(params) {
+ console.log(JSON.stringify(params));
+ var name = params.messages[0].value;
+ var ow = openwhisk();
+ return ow.triggers.create({name: name});
+}
diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala
b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 090352c..2b99938 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -19,25 +19,20 @@ package system.health
import java.util.concurrent.{TimeUnit, TimeoutException}
+import com.jayway.restassured.RestAssured
+import common.TestUtils.NOT_FOUND
+import common._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Inside, Matchers}
+import spray.json.DefaultJsonProtocol._
+import spray.json._
import system.utils.KafkaUtils
+import whisk.utils.retry
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
-import org.junit.runner.RunWith
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Inside, Matchers}
-import org.scalatest.junit.JUnitRunner
-import common.JsHelpers
-import common.TestHelpers
-import common.TestUtils
-import common.Wsk
-import common.WskActorSystem
-import common.WskProps
-import common.WskTestHelpers
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-import com.jayway.restassured.RestAssured
-import org.apache.kafka.clients.producer.ProducerRecord
-import whisk.utils.retry;
@RunWith(classOf[JUnitRunner])
class BasicHealthTest
@@ -180,7 +175,10 @@ class BasicHealthTest
}, N = 10, waitBeforeRetry = Some(1.second))
}
- val defaultAction = Some(TestUtils.getTestActionFilename("hello.js"))
+ // This action creates a trigger if it gets executed.
+ // The name of the trigger will be the message, that has been send to
kafka.
+ // We only create this trigger to verify, that the action has been
executed after sending the message to kafka.
+ val defaultAction = Some("dat/createTriggerActions.js")
val defaultActionName = s"helloKafka-$currentTime"
assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name)
=>
@@ -194,9 +192,17 @@ class BasicHealthTest
// key to use for the produced message
val key = "TheKey"
- println(s"Producing message with key: $key and value: $currentTime")
+ val verificationName = s"trigger-$currentTime"
+
+ // Check that the verification trigger does not exist before the action
ran.
+ // This will also clean up the trigger after the test.
+ assetHelper.withCleaner(wsk.trigger, verificationName) { (trigger, name)
=>
+ trigger.get(name, NOT_FOUND)
+ }
+
+ println(s"Producing message with key: $key and value: $verificationName")
val producer = kafkaUtils.createProducer()
- val record = new ProducerRecord(topic, key, currentTime)
+ val record = new ProducerRecord(topic, key, verificationName)
val future = producer.send(record)
producer.flush()
@@ -212,11 +218,10 @@ class BasicHealthTest
case e: Exception => throw e
}
- retry({
- println("Polling for activations")
- val activations = wsk.activation.pollFor(N = 1, Some(triggerName),
retries = maxRetries)
- assert(activations.nonEmpty)
- }, N = 3)
+ // Check if the trigger, that should have been created as reaction on
the kafka-message, has been created.
+ // The trigger should have been created by the action, that has been
triggered by the kafka message.
+ // If we cannot find it, the most probably the action did not run.
+ retry(wsk.trigger.get(verificationName), 60, Some(1.second))
}
it should "return correct status and configuration" in
withAssetCleaner(wskprops) {