This is an automated email from the ASF dual-hosted git repository.
japetrsn pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 02a88dd Catch Doctor exceptions and do not persist consumer database
connections (#343)
02a88dd is described below
commit 02a88dd0de5f635f1a9cdbceffb2a5703b35bc49
Author: James Dubee <[email protected]>
AuthorDate: Tue Jun 4 21:37:55 2019 -0400
Catch Doctor exceptions and do not persist consumer database connections
(#343)
* Catch Doctor exceptions
* Ensure consumer gets restarted for database connection failures
* A presistant DB connection is not needed for the consumer
---
provider/consumer.py | 14 +++++++++++---
provider/thedoctor.py | 53 +++++++++++++++++++++++++++------------------------
2 files changed, 39 insertions(+), 28 deletions(-)
diff --git a/provider/consumer.py b/provider/consumer.py
index 0647450..5c1de24 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -179,8 +179,6 @@ class ConsumerProcess (Process):
else:
self.encodeKeyAsBase64 = False
- self.database = Database()
-
# always init consumer to None in case the consumer needs to shut down
# before the KafkaConsumer is fully initialized/assigned
self.consumer = None
@@ -435,7 +433,17 @@ class ConsumerProcess (Process):
# abandon all hope?
self.setDesiredState(Consumer.State.Disabled)
# mark it disabled in the DB
- self.database.disableTrigger(self.trigger, status_code)
+
+ # when failing to establish a database connection,
mark the consumer as dead to restart the consumer
+ try:
+ self.database = Database()
+ self.database.disableTrigger(self.trigger,
status_code)
+ except Exception as e:
+ logging.error('[{}] Uncaught exception:
{}'.format(self.trigger, e))
+ self.__recordState(Consumer.State.Dead)
+ finally:
+ self.database.destroy()
+
retry = False
except requests.exceptions.RequestException as e:
logging.error('[{}] Error talking to OpenWhisk:
{}'.format(self.trigger, e))
diff --git a/provider/thedoctor.py b/provider/thedoctor.py
index 42fa9e7..42b4232 100644
--- a/provider/thedoctor.py
+++ b/provider/thedoctor.py
@@ -43,32 +43,35 @@ class TheDoctor (Thread):
logging.info('[Doctor] The Doctor is in!')
while True:
- consumers = self.consumerCollection.getCopyForRead()
+ try:
+ consumers = self.consumerCollection.getCopyForRead()
- for consumerId in consumers:
- consumer = consumers[consumerId]
- logging.debug('[Doctor] [{}] Consumer is in state:
{}'.format(consumerId, consumer.currentState()))
+ for consumerId in consumers:
+ consumer = consumers[consumerId]
+ logging.debug('[Doctor] [{}] Consumer is in state:
{}'.format(consumerId, consumer.currentState()))
- if consumer.currentState() == Consumer.State.Dead and
consumer.desiredState() == Consumer.State.Running:
- # well this is unexpected...
- logging.error('[Doctor][{}] Consumer is dead, but should
be alive!'.format(consumerId))
- consumer.restart()
- elif consumer.currentState() == Consumer.State.Dead and
consumer.desiredState() == Consumer.State.Dead:
- # Bring out yer dead...
- if consumer.process.is_alive():
- logging.info('[{}] Joining dead
process.'.format(consumer.trigger))
- # if you don't first join the process, it'll be left
hanging around as a "defunct" process
- consumer.process.join(1)
- else:
- logging.info('[{}] Process is already
dead.'.format(consumer.trigger))
+ if consumer.currentState() == Consumer.State.Dead and
consumer.desiredState() == Consumer.State.Running:
+ # well this is unexpected...
+ logging.error('[Doctor][{}] Consumer is dead, but
should be alive!'.format(consumerId))
+ consumer.restart()
+ elif consumer.currentState() == Consumer.State.Dead and
consumer.desiredState() == Consumer.State.Dead:
+ # Bring out yer dead...
+ if consumer.process.is_alive():
+ logging.info('[{}] Joining dead
process.'.format(consumer.trigger))
+ # if you don't first join the process, it'll be
left hanging around as a "defunct" process
+ consumer.process.join(1)
+ else:
+ logging.info('[{}] Process is already
dead.'.format(consumer.trigger))
- logging.info('[{}] Removing dead consumer from the
collection.'.format(consumer.trigger))
-
self.consumerCollection.removeConsumerForTrigger(consumer.trigger)
- elif consumer.secondsSinceLastPoll() >
self.poll_timeout_seconds and consumer.desiredState() == Consumer.State.Running:
- # there seems to be an issue with the kafka-python client
where it gets into an
- # error-handling loop. This causes poll() to never
complete, but also does not
- # throw an exception.
- logging.error('[Doctor][{}] Consumer timed-out, but should
be alive! Restarting consumer.'.format(consumerId))
- consumer.restart()
+ logging.info('[{}] Removing dead consumer from the
collection.'.format(consumer.trigger))
+
self.consumerCollection.removeConsumerForTrigger(consumer.trigger)
+ elif consumer.secondsSinceLastPoll() >
self.poll_timeout_seconds and consumer.desiredState() == Consumer.State.Running:
+ # there seems to be an issue with the kafka-python
client where it gets into an
+ # error-handling loop. This causes poll() to never
complete, but also does not
+ # throw an exception.
+ logging.error('[Doctor][{}] Consumer timed-out, but
should be alive! Restarting consumer.'.format(consumerId))
+ consumer.restart()
- time.sleep(self.sleepy_time_seconds)
+ time.sleep(self.sleepy_time_seconds)
+ except Exception as e:
+ logging.error("[Doctor] Uncaught exception: {}".format(e))