This is an automated email from the ASF dual-hosted git repository.
japetrsn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 566990c Do not skip last sequence if an exception occurs (#364)
566990c is described below
commit 566990cfb1c113877b318857036f9151bc4430d1
Author: James Dubee <[email protected]>
AuthorDate: Mon Jan 6 14:21:43 2020 -0500
Do not skip last sequence if an exception occurs (#364)
---
provider/service.py | 25 ++++++---
.../system/packages/MessageHubFeedTests.scala | 61 +++++++++++++++++++++-
2 files changed, 78 insertions(+), 8 deletions(-)
diff --git a/provider/service.py b/provider/service.py
index fa8e109..b8a272f 100644
--- a/provider/service.py
+++ b/provider/service.py
@@ -68,11 +68,6 @@ class Service (Thread):
# check whether or not the feed is capable of detecting
canary
# documents
if change != None:
- # Record the sequence in case the changes feed needs
to be
- # restarted. This way the new feed can pick up right
where
- # the old one left off.
- self.lastSequence = change['seq']
-
if "deleted" in change and change["deleted"] == True:
logging.info('[changes] Found a delete')
consumer =
self.consumers.getConsumerForTrigger(change['id'])
@@ -109,7 +104,20 @@ class Service (Thread):
elif triggerIsAssignedToMe:
logging.info('[{}] Found a change to an
existing trigger'.format(change["id"]))
- if existingConsumer.desiredState() ==
Consumer.State.Disabled and self.__isTriggerDocActive(document):
+ if existingConsumer.desiredState() ==
Consumer.State.Dead and self.__isTriggerDocActive(document):
+ # if a delete occurs followed quickly
by a create the consumer might get stuck in a dead state,
+ # so we need to forcefully delete the
process before recreating it.
+ logging.info('[{}] A create event
occurred for a trigger that is shutting down'.format(change["id"]))
+
+ if existingConsumer.process.is_alive():
+ logging.info('[{}] Joining dead
process.'.format(existingConsumer.trigger))
+ existingConsumer.process.join(1)
+ else:
+ logging.info('[{}] Process is
already dead.'.format(existingConsumer.trigger))
+
+
self.consumers.removeConsumerForTrigger(existingConsumer.trigger)
+ self.createAndRunConsumer(document)
+ elif existingConsumer.desiredState() ==
Consumer.State.Disabled and self.__isTriggerDocActive(document):
# disabled trigger has become active
logging.info('[{}] Existing disabled
trigger should become active'.format(change["id"]))
self.createAndRunConsumer(document)
@@ -123,6 +131,11 @@ class Service (Thread):
self.lastCanaryTime = datetime.now()
else:
logging.debug('[changes] Found a change for a
non-trigger document')
+
+ # Record the sequence in case the changes feed needs
to be
+ # restarted. This way the new feed can pick up right
where
+ # the old one left off.
+ self.lastSequence = change['seq']
except Exception as e:
logging.error('[canary] Exception caught from changes feed.
Restarting changes feed...')
logging.error(e)
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 0c05d63..9974bf5 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -16,10 +16,9 @@
*/
package system.packages
-import system.utils.KafkaUtils
-
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
+import system.utils.KafkaUtils
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
@@ -40,6 +39,8 @@ import common.TestUtils.NOT_FOUND
import org.apache.openwhisk.utils.retry
import org.apache.openwhisk.core.entity.Annotations
import java.util.concurrent.ExecutionException
+import common.ActivationResult
+import common.TestUtils.SUCCESS_EXIT
@RunWith(classOf[JUnitRunner])
class MessageHubFeedTests
@@ -113,6 +114,62 @@ class MessageHubFeedTests
runActionWithExpectedResult(actionName, "dat/multipleValueTypes.json",
expectedOutput, false)
}
+ it should "create a trigger, delete that trigger, and quickly create it
again with successful trigger fires" in withAssetCleaner(wskprops) {
+ val currentTime = s"${System.currentTimeMillis}"
+
+ (wp, assetHelper) =>
+ val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
+ val ruleName = s"dummyMessageHub-helloKafka-$currentTime"
+ val parameters = Map(
+ "user" -> getAsJson("user"),
+ "password" -> getAsJson("password"),
+ "api_key" -> getAsJson("api_key"),
+ "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+ "kafka_brokers_sasl" -> getAsJson("brokers"),
+ "topic" -> topic.toJson
+ )
+
+ val key = "TheKey"
+ val verificationName = s"trigger-$currentTime"
+ val defaultAction = Some("dat/createTriggerActions.js")
+ val defaultActionName = s"helloKafka-$currentTime"
+
+ createTrigger(assetHelper, triggerName, parameters)
+
+ assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name)
=>
+ action.create(name, defaultAction, annotations =
Map(Annotations.ProvideApiKeyAnnotationName -> JsBoolean(true)))
+ }
+
+ assetHelper.withCleaner(wsk.rule, ruleName) { (rule, name) =>
+ rule.create(name, trigger = triggerName, action = defaultActionName)
+ }
+
+ assetHelper.withCleaner(wsk.trigger, verificationName) { (trigger, name)
=>
+ trigger.get(name, NOT_FOUND)
+ }
+
+ produceMessage(topic, key, verificationName)
+ retry(wsk.trigger.get(verificationName), 60, Some(1.second))
+
+ wsk.trigger.delete(verificationName, expectedExitCode = SUCCESS_EXIT)
+ wsk.trigger.delete(triggerName, expectedExitCode = SUCCESS_EXIT)
+
+ val feedCreationResult = wsk.trigger.create(triggerName, feed =
Some(s"/whisk.system/messaging/messageHubFeed"), parameters = parameters)
+ val activation =
wsk.parseJsonString(feedCreationResult.stdout.substring(0,
feedCreationResult.stdout.indexOf("ok: created
trigger"))).convertTo[ActivationResult]
+ activation.response.success shouldBe true
+
+ wsk.rule.enable(ruleName, expectedExitCode = SUCCESS_EXIT)
+
+ println("Giving the consumer a moment to get ready")
+ Thread.sleep(KafkaUtils.consumerInitTime)
+
+ val uuid =
activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"",
"")
+ consumerExists(uuid)
+
+ produceMessage(topic, key, verificationName)
+ retry(wsk.trigger.get(verificationName), 60, Some(1.second))
+ }
+
it should "fire multiple triggers for two large payloads" in
withAssetCleaner(wskprops) {
// payload size should be under the payload limit, but greater than 50% of
the limit
val testPayloadSize = 600000