This is an automated email from the ASF dual-hosted git repository.

japetrsn pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 02a88dd  Catch Doctor exceptions and do not persist consumer database 
connections (#343)
02a88dd is described below

commit 02a88dd0de5f635f1a9cdbceffb2a5703b35bc49
Author: James Dubee <[email protected]>
AuthorDate: Tue Jun 4 21:37:55 2019 -0400

    Catch Doctor exceptions and do not persist consumer database connections 
(#343)
    
    * Catch Doctor exceptions
    
    * Ensure consumer gets restarted for database connection failures
    
    * A presistant DB connection is not needed for the consumer
---
 provider/consumer.py  | 14 +++++++++++---
 provider/thedoctor.py | 53 +++++++++++++++++++++++++++------------------------
 2 files changed, 39 insertions(+), 28 deletions(-)

diff --git a/provider/consumer.py b/provider/consumer.py
index 0647450..5c1de24 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -179,8 +179,6 @@ class ConsumerProcess (Process):
         else:
             self.encodeKeyAsBase64 = False
 
-        self.database = Database()
-
         # always init consumer to None in case the consumer needs to shut down
         # before the KafkaConsumer is fully initialized/assigned
         self.consumer = None
@@ -435,7 +433,17 @@ class ConsumerProcess (Process):
                         # abandon all hope?
                         self.setDesiredState(Consumer.State.Disabled)
                         # mark it disabled in the DB
-                        self.database.disableTrigger(self.trigger, status_code)
+
+                        # when failing to establish a database connection, 
mark the consumer as dead to restart the consumer
+                        try:
+                            self.database = Database()
+                            self.database.disableTrigger(self.trigger, 
status_code)
+                        except Exception as e:
+                            logging.error('[{}] Uncaught exception: 
{}'.format(self.trigger, e))
+                            self.__recordState(Consumer.State.Dead)
+                        finally:
+                            self.database.destroy()
+
                         retry = False
                 except requests.exceptions.RequestException as e:
                     logging.error('[{}] Error talking to OpenWhisk: 
{}'.format(self.trigger, e))
diff --git a/provider/thedoctor.py b/provider/thedoctor.py
index 42fa9e7..42b4232 100644
--- a/provider/thedoctor.py
+++ b/provider/thedoctor.py
@@ -43,32 +43,35 @@ class TheDoctor (Thread):
         logging.info('[Doctor] The Doctor is in!')
 
         while True:
-            consumers = self.consumerCollection.getCopyForRead()
+            try:
+                consumers = self.consumerCollection.getCopyForRead()
 
-            for consumerId in consumers:
-                consumer = consumers[consumerId]
-                logging.debug('[Doctor] [{}] Consumer is in state: 
{}'.format(consumerId, consumer.currentState()))
+                for consumerId in consumers:
+                    consumer = consumers[consumerId]
+                    logging.debug('[Doctor] [{}] Consumer is in state: 
{}'.format(consumerId, consumer.currentState()))
 
-                if consumer.currentState() == Consumer.State.Dead and 
consumer.desiredState() == Consumer.State.Running:
-                    # well this is unexpected...
-                    logging.error('[Doctor][{}] Consumer is dead, but should 
be alive!'.format(consumerId))
-                    consumer.restart()
-                elif consumer.currentState() == Consumer.State.Dead and 
consumer.desiredState() == Consumer.State.Dead:
-                    # Bring out yer dead...
-                    if consumer.process.is_alive():
-                        logging.info('[{}] Joining dead 
process.'.format(consumer.trigger))
-                        # if you don't first join the process, it'll be left 
hanging around as a "defunct" process
-                        consumer.process.join(1)
-                    else:
-                        logging.info('[{}] Process is already 
dead.'.format(consumer.trigger))
+                    if consumer.currentState() == Consumer.State.Dead and 
consumer.desiredState() == Consumer.State.Running:
+                        # well this is unexpected...
+                        logging.error('[Doctor][{}] Consumer is dead, but 
should be alive!'.format(consumerId))
+                        consumer.restart()
+                    elif consumer.currentState() == Consumer.State.Dead and 
consumer.desiredState() == Consumer.State.Dead:
+                        # Bring out yer dead...
+                        if consumer.process.is_alive():
+                            logging.info('[{}] Joining dead 
process.'.format(consumer.trigger))
+                            # if you don't first join the process, it'll be 
left hanging around as a "defunct" process
+                            consumer.process.join(1)
+                        else:
+                            logging.info('[{}] Process is already 
dead.'.format(consumer.trigger))
 
-                    logging.info('[{}] Removing dead consumer from the 
collection.'.format(consumer.trigger))
-                    
self.consumerCollection.removeConsumerForTrigger(consumer.trigger)
-                elif consumer.secondsSinceLastPoll() > 
self.poll_timeout_seconds and consumer.desiredState() == Consumer.State.Running:
-                    # there seems to be an issue with the kafka-python client 
where it gets into an
-                    # error-handling loop. This causes poll() to never 
complete, but also does not
-                    # throw an exception.
-                    logging.error('[Doctor][{}] Consumer timed-out, but should 
be alive! Restarting consumer.'.format(consumerId))
-                    consumer.restart()
+                        logging.info('[{}] Removing dead consumer from the 
collection.'.format(consumer.trigger))
+                        
self.consumerCollection.removeConsumerForTrigger(consumer.trigger)
+                    elif consumer.secondsSinceLastPoll() > 
self.poll_timeout_seconds and consumer.desiredState() == Consumer.State.Running:
+                        # there seems to be an issue with the kafka-python 
client where it gets into an
+                        # error-handling loop. This causes poll() to never 
complete, but also does not
+                        # throw an exception.
+                        logging.error('[Doctor][{}] Consumer timed-out, but 
should be alive! Restarting consumer.'.format(consumerId))
+                        consumer.restart()
 
-            time.sleep(self.sleepy_time_seconds)
+                time.sleep(self.sleepy_time_seconds)
+            except Exception as e:
+                logging.error("[Doctor] Uncaught exception: {}".format(e))

Reply via email to