This is an automated email from the ASF dual-hosted git repository.
dubeejw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 507fa2e Retry failed database changes (#365)
507fa2e is described below
commit 507fa2e614dc60903771ef973508dd3803977e83
Author: James Dubee <[email protected]>
AuthorDate: Wed Jan 8 09:34:59 2020 -0500
Retry failed database changes (#365)
---
provider/service.py | 147 ++++++++++++++++++++++++++++++----------------------
1 file changed, 84 insertions(+), 63 deletions(-)
diff --git a/provider/service.py b/provider/service.py
index b8a272f..26e7909 100644
--- a/provider/service.py
+++ b/provider/service.py
@@ -68,69 +68,7 @@ class Service (Thread):
# check whether or not the feed is capable of detecting
canary
# documents
if change != None:
- if "deleted" in change and change["deleted"] == True:
- logging.info('[changes] Found a delete')
- consumer =
self.consumers.getConsumerForTrigger(change['id'])
- if consumer != None:
- if consumer.desiredState() ==
Consumer.State.Disabled:
- # just remove it from memory
- logging.info('[{}] Removing disabled
trigger'.format(consumer.trigger))
-
self.consumers.removeConsumerForTrigger(consumer.trigger)
- else:
- logging.info('[{}] Shutting down running
trigger'.format(consumer.trigger))
- consumer.shutdown()
- # since we can't use a filter function for the feed
(then
- # you don't get deletes) we need to manually verify
this
- # is a valid trigger doc that has changed
- elif 'triggerURL' in change['doc']:
- logging.info('[changes] Found a change in a
trigger document')
- document = change['doc']
- triggerIsAssignedToMe =
self.__isTriggerDocAssignedToMe(document)
-
- if not
self.consumers.hasConsumerForTrigger(change["id"]):
- if triggerIsAssignedToMe:
- logging.info('[{}] Found a new trigger to
create'.format(change["id"]))
- self.createAndRunConsumer(document)
- else:
- logging.info("[{}] Found a new trigger,
but is assigned to another worker: {}".format(change["id"], document["worker"]))
- else:
- existingConsumer =
self.consumers.getConsumerForTrigger(change["id"])
-
- if existingConsumer.desiredState() ==
Consumer.State.Running and not self.__isTriggerDocActive(document):
- # running trigger should become disabled
- # this should be done regardless of which
worker the document claims to be assigned to
- logging.info('[{}] Existing running
trigger should become disabled'.format(change["id"]))
- existingConsumer.disable()
- elif triggerIsAssignedToMe:
- logging.info('[{}] Found a change to an
existing trigger'.format(change["id"]))
-
- if existingConsumer.desiredState() ==
Consumer.State.Dead and self.__isTriggerDocActive(document):
- # if a delete occurs followed quickly
by a create the consumer might get stuck in a dead state,
- # so we need to forcefully delete the
process before recreating it.
- logging.info('[{}] A create event
occurred for a trigger that is shutting down'.format(change["id"]))
-
- if existingConsumer.process.is_alive():
- logging.info('[{}] Joining dead
process.'.format(existingConsumer.trigger))
- existingConsumer.process.join(1)
- else:
- logging.info('[{}] Process is
already dead.'.format(existingConsumer.trigger))
-
-
self.consumers.removeConsumerForTrigger(existingConsumer.trigger)
- self.createAndRunConsumer(document)
- elif existingConsumer.desiredState() ==
Consumer.State.Disabled and self.__isTriggerDocActive(document):
- # disabled trigger has become active
- logging.info('[{}] Existing disabled
trigger should become active'.format(change["id"]))
- self.createAndRunConsumer(document)
- else:
- # trigger has become reassigned to a
different worker
- logging.info("[{}] Shutting down trigger
as it has been re-assigned to {}".format(change["id"], document["worker"]))
- existingConsumer.shutdown()
- elif 'canary-timestamp' in change['doc']:
- # found a canary - update lastCanaryTime
- logging.info('[canary] I found a canary. The last
one was {} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
- self.lastCanaryTime = datetime.now()
- else:
- logging.debug('[changes] Found a change for a
non-trigger document')
+ self.__handleDocChange(change)
# Record the sequence in case the changes feed needs
to be
# restarted. This way the new feed can pick up right
where
@@ -143,6 +81,89 @@ class Service (Thread):
logging.debug("[changes] I made it out of the changes loop!")
+ def __handleDocChange(self, change):
+ retry = True
+ retryCount = 0
+ maxRetries = 5
+
+ while retry:
+ try:
+ if "deleted" in change and change["deleted"] == True:
+ logging.info('[changes] Found a delete')
+ consumer =
self.consumers.getConsumerForTrigger(change['id'])
+ if consumer != None:
+ if consumer.desiredState() == Consumer.State.Disabled:
+ # just remove it from memory
+ logging.info('[{}] Removing disabled
trigger'.format(consumer.trigger))
+
self.consumers.removeConsumerForTrigger(consumer.trigger)
+ else:
+ logging.info('[{}] Shutting down running
trigger'.format(consumer.trigger))
+ consumer.shutdown()
+ # since we can't use a filter function for the feed (then
+ # you don't get deletes) we need to manually verify this
+ # is a valid trigger doc that has changed
+ elif 'triggerURL' in change['doc']:
+ logging.info('[changes] Found a change in a trigger
document')
+ document = change['doc']
+ triggerIsAssignedToMe =
self.__isTriggerDocAssignedToMe(document)
+
+ if not self.consumers.hasConsumerForTrigger(change["id"]):
+ if triggerIsAssignedToMe:
+ logging.info('[{}] Found a new trigger to
create'.format(change["id"]))
+ self.createAndRunConsumer(document)
+ else:
+ logging.info("[{}] Found a new trigger, but is
assigned to another worker: {}".format(change["id"], document["worker"]))
+ else:
+ existingConsumer =
self.consumers.getConsumerForTrigger(change["id"])
+
+ if existingConsumer.desiredState() ==
Consumer.State.Running and not self.__isTriggerDocActive(document):
+ # running trigger should become disabled
+ # this should be done regardless of which worker
the document claims to be assigned to
+ logging.info('[{}] Existing running trigger should
become disabled'.format(change["id"]))
+ existingConsumer.disable()
+ elif triggerIsAssignedToMe:
+ logging.info('[{}] Found a change to an existing
trigger'.format(change["id"]))
+
+ if existingConsumer.desiredState() ==
Consumer.State.Dead and self.__isTriggerDocActive(document):
+ # if a delete occurs followed quickly by a
create the consumer might get stuck in a dead state,
+ # so we need to forcefully delete the process
before recreating it.
+ logging.info('[{}] A create event occurred for
a trigger that is shutting down'.format(change["id"]))
+
+ if existingConsumer.process.is_alive():
+ logging.info('[{}] Joining dead
process.'.format(existingConsumer.trigger))
+ existingConsumer.process.join(1)
+ else:
+ logging.info('[{}] Process is already
dead.'.format(existingConsumer.trigger))
+
+
self.consumers.removeConsumerForTrigger(existingConsumer.trigger)
+ self.createAndRunConsumer(document)
+ elif existingConsumer.desiredState() ==
Consumer.State.Disabled and self.__isTriggerDocActive(document):
+ # disabled trigger has become active
+ logging.info('[{}] Existing disabled trigger
should become active'.format(change["id"]))
+ self.createAndRunConsumer(document)
+ else:
+ # trigger has become reassigned to a different
worker
+ logging.info("[{}] Shutting down trigger as it has
been re-assigned to {}".format(change["id"], document["worker"]))
+ existingConsumer.shutdown()
+ elif 'canary-timestamp' in change['doc']:
+ # found a canary - update lastCanaryTime
+ logging.info('[canary] I found a canary. The last one was
{} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
+ self.lastCanaryTime = datetime.now()
+ else:
+ logging.debug('[changes] Found a change for a non-trigger
document')
+
+ retry = False
+ except Exception as e:
+ logging.error('[{}] Exception caught while handling
change.'.format(change["id"]))
+ logging.error(e)
+
+ if retry:
+ retryCount += 1
+
+ if retryCount >= maxRetries:
+ logging.warn('[{}] Maximum number of retries exceeded
for failed change.'.format(change["id"]))
+ retry = False
+
def __isTriggerDocAssignedToMe(self, doc):
if "worker" in doc:
return doc["worker"] == self.workerId