dubeejw commented on a change in pull request #205: Better handle when the
connection to the DB drops
URL:
https://github.com/apache/incubator-openwhisk-package-kafka/pull/205#discussion_r129857068
##########
File path: provider/service.py
##########
@@ -58,76 +49,76 @@ def run(self):
self.canaryGenerator.start()
while True:
- for change in self.changes:
- # change could be None because the changes feed will timeout
- # if it hasn't detected any changes. This timeout allows us to
- # check whether or not the feed is capable of detecting canary
- # documents
- if change != None:
- # Record the sequence in case the changes feed needs to be
- # restarted. This way the new feed can pick up right where
- # the old one left off.
- self.lastSequence = change['seq']
-
- if "deleted" in change and change["deleted"] == True:
- logging.info('[changes] Found a delete')
- consumer =
self.consumers.getConsumerForTrigger(change['id'])
- if consumer != None:
- if consumer.desiredState() ==
Consumer.State.Disabled:
- # just remove it from memory
- logging.info('[{}] Removing disabled
trigger'.format(consumer.trigger))
-
self.consumers.removeConsumerForTrigger(consumer.trigger)
- else:
- logging.info('[{}] Shutting down running
trigger'.format(consumer.trigger))
- consumer.shutdown()
- # since we can't use a filter function for the feed (then
- # you don't get deletes) we need to manually verify this
- # is a valid trigger doc that has changed
- elif 'triggerURL' in change['doc']:
- logging.info('[changes] Found a change in a trigger
document')
- document = change['doc']
-
- if not
self.consumers.hasConsumerForTrigger(change["id"]):
- logging.info('[{}] Found a new trigger to
create'.format(change["id"]))
- self.createAndRunConsumer(document)
- else:
- logging.info('[{}] Found a change to an existing
trigger'.format(change["id"]))
- existingConsumer =
self.consumers.getConsumerForTrigger(change["id"])
-
- if existingConsumer.desiredState() ==
Consumer.State.Disabled and self.__isTriggerDocActive(document):
- # disabled trigger has become active
- logging.info('[{}] Existing disabled trigger
should become active'.format(change["id"]))
+ try:
+ logging.info("Starting changes feed")
+ self.database = Database(timeout=changesFeedTimeout)
+ self.changes =
self.database.changesFeed(timeout=changesFeedTimeout, since=self.lastSequence)
+
+ self.lastCanaryTime = datetime.now()
+
+ for change in self.changes:
+ # change could be None because the changes feed will
timeout
+ # if it hasn't detected any changes. This timeout allows
us to
+ # check whether or not the feed is capable of detecting
canary
+ # documents
+ if change != None:
+ # Record the sequence in case the changes feed needs
to be
+ # restarted. This way the new feed can pick up right
where
+ # the old one left off.
+ self.lastSequence = change['seq']
+
+ if "deleted" in change and change["deleted"] == True:
+ logging.info('[changes] Found a delete')
+ consumer =
self.consumers.getConsumerForTrigger(change['id'])
+ if consumer != None:
+ if consumer.desiredState() ==
Consumer.State.Disabled:
+ # just remove it from memory
+ logging.info('[{}] Removing disabled
trigger'.format(consumer.trigger))
+
self.consumers.removeConsumerForTrigger(consumer.trigger)
+ else:
+ logging.info('[{}] Shutting down running
trigger'.format(consumer.trigger))
+ consumer.shutdown()
+ # since we can't use a filter function for the feed
(then
+ # you don't get deletes) we need to manually verify
this
+ # is a valid trigger doc that has changed
+ elif 'triggerURL' in change['doc']:
+ logging.info('[changes] Found a change in a
trigger document')
+ document = change['doc']
+
+ if not
self.consumers.hasConsumerForTrigger(change["id"]):
+ logging.info('[{}] Found a new trigger to
create'.format(change["id"]))
self.createAndRunConsumer(document)
- elif existingConsumer.desiredState() ==
Consumer.State.Running and not self.__isTriggerDocActive(document):
- # running trigger should become disabled
- logging.info('[{}] Existing running trigger
should become disabled'.format(change["id"]))
- existingConsumer.disable()
else:
- logging.debug('[changes] Found non-interesting
trigger change: \n{}\n{}'.format(existingConsumer.desiredState(), document))
- elif 'canary-timestamp' in change['doc']:
- # found a canary - update lastCanaryTime
- logging.info('[canary] I found a canary. The last one
was {} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
- self.lastCanaryTime = datetime.now()
- else:
- logging.debug('[changes] Found a change for a
non-trigger document')
-
- if secondsSince(self.lastCanaryTime) > canaryTimeout:
- logging.warn('[canary] It has been more than {} seconds
since the last canary - restarting the DB changes feed'.format(canaryTimeout))
- self.restartChangesFeed()
- # break out of the for loop so that it can be
re-established
- # with the new changes feed.
- break
+ logging.info('[{}] Found a change to an
existing trigger'.format(change["id"]))
+ existingConsumer =
self.consumers.getConsumerForTrigger(change["id"])
+
+ if existingConsumer.desiredState() ==
Consumer.State.Disabled and self.__isTriggerDocActive(document):
+ # disabled trigger has become active
+ logging.info('[{}] Existing disabled
trigger should become active'.format(change["id"]))
+ self.createAndRunConsumer(document)
+ elif existingConsumer.desiredState() ==
Consumer.State.Running and not self.__isTriggerDocActive(document):
+ # running trigger should become disabled
+ logging.info('[{}] Existing running
trigger should become disabled'.format(change["id"]))
+ existingConsumer.disable()
+ else:
+ logging.debug('[changes] Found
non-interesting trigger change:
\n{}\n{}'.format(existingConsumer.desiredState(), document))
+ elif 'canary-timestamp' in change['doc']:
+ # found a canary - update lastCanaryTime
+ logging.info('[canary] I found a canary. The last
one was {} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
+ self.lastCanaryTime = datetime.now()
+ else:
+ logging.debug('[changes] Found a change for a
non-trigger document')
+ except (ConnectionError, ReadTimeout):
+ logging.error('[canary] DB connection timed out. Restarting
changes feed...')
+ self.stopChangesFeed()
logging.debug("[changes] I made it out of the changes loop!")
- def restartChangesFeed(self):
+ def stopChangesFeed(self):
if self.changes != None:
self.changes.stop()
self.changes = None
Review comment:
This can be in the if block above.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services