dubeejw commented on a change in pull request #205: Better handle when the 
connection to the DB drops
URL: 
https://github.com/apache/incubator-openwhisk-package-kafka/pull/205#discussion_r129857068
 
 

 ##########
 File path: provider/service.py
 ##########
 @@ -58,76 +49,76 @@ def run(self):
         self.canaryGenerator.start()
 
         while True:
-            for change in self.changes:
-                # change could be None because the changes feed will timeout
-                # if it hasn't detected any changes. This timeout allows us to
-                # check whether or not the feed is capable of detecting canary
-                # documents
-                if change != None:
-                    # Record the sequence in case the changes feed needs to be
-                    # restarted. This way the new feed can pick up right where
-                    # the old one left off.
-                    self.lastSequence = change['seq']
-
-                    if "deleted" in change and change["deleted"] == True:
-                        logging.info('[changes] Found a delete')
-                        consumer = 
self.consumers.getConsumerForTrigger(change['id'])
-                        if consumer != None:
-                            if consumer.desiredState() == 
Consumer.State.Disabled:
-                                # just remove it from memory
-                                logging.info('[{}] Removing disabled 
trigger'.format(consumer.trigger))
-                                
self.consumers.removeConsumerForTrigger(consumer.trigger)
-                            else:
-                                logging.info('[{}] Shutting down running 
trigger'.format(consumer.trigger))
-                                consumer.shutdown()
-                    # since we can't use a filter function for the feed (then
-                    # you don't get deletes) we need to manually verify this
-                    # is a valid trigger doc that has changed
-                    elif 'triggerURL' in change['doc']:
-                        logging.info('[changes] Found a change in a trigger 
document')
-                        document = change['doc']
-
-                        if not 
self.consumers.hasConsumerForTrigger(change["id"]):
-                            logging.info('[{}] Found a new trigger to 
create'.format(change["id"]))
-                            self.createAndRunConsumer(document)
-                        else:
-                            logging.info('[{}] Found a change to an existing 
trigger'.format(change["id"]))
-                            existingConsumer = 
self.consumers.getConsumerForTrigger(change["id"])
-
-                            if existingConsumer.desiredState() == 
Consumer.State.Disabled and self.__isTriggerDocActive(document):
-                                # disabled trigger has become active
-                                logging.info('[{}] Existing disabled trigger 
should become active'.format(change["id"]))
+            try:
+                logging.info("Starting changes feed")
+                self.database = Database(timeout=changesFeedTimeout)
+                self.changes = 
self.database.changesFeed(timeout=changesFeedTimeout, since=self.lastSequence)
+
+                self.lastCanaryTime = datetime.now()
+
+                for change in self.changes:
+                    # change could be None because the changes feed will 
timeout
+                    # if it hasn't detected any changes. This timeout allows 
us to
+                    # check whether or not the feed is capable of detecting 
canary
+                    # documents
+                    if change != None:
+                        # Record the sequence in case the changes feed needs 
to be
+                        # restarted. This way the new feed can pick up right 
where
+                        # the old one left off.
+                        self.lastSequence = change['seq']
+
+                        if "deleted" in change and change["deleted"] == True:
+                            logging.info('[changes] Found a delete')
+                            consumer = 
self.consumers.getConsumerForTrigger(change['id'])
+                            if consumer != None:
+                                if consumer.desiredState() == 
Consumer.State.Disabled:
+                                    # just remove it from memory
+                                    logging.info('[{}] Removing disabled 
trigger'.format(consumer.trigger))
+                                    
self.consumers.removeConsumerForTrigger(consumer.trigger)
+                                else:
+                                    logging.info('[{}] Shutting down running 
trigger'.format(consumer.trigger))
+                                    consumer.shutdown()
+                        # since we can't use a filter function for the feed 
(then
+                        # you don't get deletes) we need to manually verify 
this
+                        # is a valid trigger doc that has changed
+                        elif 'triggerURL' in change['doc']:
+                            logging.info('[changes] Found a change in a 
trigger document')
+                            document = change['doc']
+
+                            if not 
self.consumers.hasConsumerForTrigger(change["id"]):
+                                logging.info('[{}] Found a new trigger to 
create'.format(change["id"]))
                                 self.createAndRunConsumer(document)
-                            elif existingConsumer.desiredState() == 
Consumer.State.Running and not self.__isTriggerDocActive(document):
-                                # running trigger should become disabled
-                                logging.info('[{}] Existing running trigger 
should become disabled'.format(change["id"]))
-                                existingConsumer.disable()
                             else:
-                                logging.debug('[changes] Found non-interesting 
trigger change: \n{}\n{}'.format(existingConsumer.desiredState(), document))
-                    elif 'canary-timestamp' in change['doc']:
-                        # found a canary - update lastCanaryTime
-                        logging.info('[canary] I found a canary. The last one 
was {} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
-                        self.lastCanaryTime = datetime.now()
-                    else:
-                        logging.debug('[changes] Found a change for a 
non-trigger document')
-
-                if secondsSince(self.lastCanaryTime) > canaryTimeout:
-                    logging.warn('[canary] It has been more than {} seconds 
since the last canary - restarting the DB changes feed'.format(canaryTimeout))
-                    self.restartChangesFeed()
-                    # break out of the for loop so that it can be 
re-established
-                    # with the new changes feed.
-                    break
+                                logging.info('[{}] Found a change to an 
existing trigger'.format(change["id"]))
+                                existingConsumer = 
self.consumers.getConsumerForTrigger(change["id"])
+
+                                if existingConsumer.desiredState() == 
Consumer.State.Disabled and self.__isTriggerDocActive(document):
+                                    # disabled trigger has become active
+                                    logging.info('[{}] Existing disabled 
trigger should become active'.format(change["id"]))
+                                    self.createAndRunConsumer(document)
+                                elif existingConsumer.desiredState() == 
Consumer.State.Running and not self.__isTriggerDocActive(document):
+                                    # running trigger should become disabled
+                                    logging.info('[{}] Existing running 
trigger should become disabled'.format(change["id"]))
+                                    existingConsumer.disable()
+                                else:
+                                    logging.debug('[changes] Found 
non-interesting trigger change: 
\n{}\n{}'.format(existingConsumer.desiredState(), document))
+                        elif 'canary-timestamp' in change['doc']:
+                            # found a canary - update lastCanaryTime
+                            logging.info('[canary] I found a canary. The last 
one was {} seconds ago.'.format(secondsSince(self.lastCanaryTime)))
+                            self.lastCanaryTime = datetime.now()
+                        else:
+                            logging.debug('[changes] Found a change for a 
non-trigger document')
+            except (ConnectionError, ReadTimeout):
+                logging.error('[canary] DB connection timed out. Restarting 
changes feed...')
+                self.stopChangesFeed()
 
             logging.debug("[changes] I made it out of the changes loop!")
 
-    def restartChangesFeed(self):
+    def stopChangesFeed(self):
         if self.changes != None:
             self.changes.stop()
 
         self.changes = None
 
 Review comment:
   This can be in the if block above.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to