This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b2d7b8  remove assumption that partitions will be non-empty when 
passed into callbacks (#243)
1b2d7b8 is described below

commit 1b2d7b849a78ccdf1847a4328c4663665671e157
Author: Adnan Baruni <[email protected]>
AuthorDate: Wed Jan 31 16:03:19 2018 -0600

    remove assumption that partitions will be non-empty when passed into 
callbacks (#243)
---
 provider/consumer.py | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/provider/consumer.py b/provider/consumer.py
index 4728d27..5d2d2b4 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -481,21 +481,20 @@ class ConsumerProcess (Process):
         return key
 
     def __error_callback(self, error):
-        logging.warning(error)
+        logging.warning('[{}] {}'.format(self.trigger, error))
         if not self.connected and error.code() == KafkaError._AUTHENTICATION:
             self.authErrors = self.authErrors + 1
             if self.authErrors > self.maxAuthErrors:
+                logging.warning('[{}] Shutting down consumer and disabling 
trigger. Exceeded the allowable number of _AUTHENTICATION 
errors'.format(self.trigger))
                 self.setDesiredState(Consumer.State.Disabled)
                 message = 'Automatically disabled trigger. Consumer failed to 
authenticate with broker(s) after more than 30 attempts with apikey 
{}:{}'.format(self.username, self.password)
                 self.database.disableTrigger(self.trigger, 403, message)
 
     def __on_assign(self, consumer, partitions):
-        topicPartition = partitions[0]
-        logging.info('[{}] Completed partition assignment. topic: {}, 
partition: {}, offset: {}'.format(self.trigger, topicPartition.topic, 
topicPartition.partition, topicPartition.offset))
+        logging.info('[{}] Completed partition assignment. Connected to 
broker(s)'.format(self.trigger))
         self.authErrors = 0
         self.connected = True
 
     def __on_revoke(self, consumer, partitions):
-        topicPartition = partitions[0]
-        logging.info('[{}] Partition assignment has been revoked. topic: {}, 
partition: {}, offset: {}'.format(self.trigger, topicPartition.topic, 
topicPartition.partition, topicPartition.offset))
+        logging.info('[{}] Partition assignment has been revoked. Disconnected 
from broker(s)'.format(self.trigger))
         self.connected = False

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to