This is an automated email from the ASF dual-hosted git repository.
dubeejw pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 1b2d7b8 remove assumption that partitions will be non-empty when
passed into callbacks (#243)
1b2d7b8 is described below
commit 1b2d7b849a78ccdf1847a4328c4663665671e157
Author: Adnan Baruni <[email protected]>
AuthorDate: Wed Jan 31 16:03:19 2018 -0600
remove assumption that partitions will be non-empty when passed into
callbacks (#243)
---
provider/consumer.py | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
diff --git a/provider/consumer.py b/provider/consumer.py
index 4728d27..5d2d2b4 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -481,21 +481,20 @@ class ConsumerProcess (Process):
return key
def __error_callback(self, error):
- logging.warning(error)
+ logging.warning('[{}] {}'.format(self.trigger, error))
if not self.connected and error.code() == KafkaError._AUTHENTICATION:
self.authErrors = self.authErrors + 1
if self.authErrors > self.maxAuthErrors:
+ logging.warning('[{}] Shutting down consumer and disabling
trigger. Exceeded the allowable number of _AUTHENTICATION
errors'.format(self.trigger))
self.setDesiredState(Consumer.State.Disabled)
message = 'Automatically disabled trigger. Consumer failed to
authenticate with broker(s) after more than 30 attempts with apikey
{}:{}'.format(self.username, self.password)
self.database.disableTrigger(self.trigger, 403, message)
def __on_assign(self, consumer, partitions):
- topicPartition = partitions[0]
- logging.info('[{}] Completed partition assignment. topic: {},
partition: {}, offset: {}'.format(self.trigger, topicPartition.topic,
topicPartition.partition, topicPartition.offset))
+ logging.info('[{}] Completed partition assignment. Connected to
broker(s)'.format(self.trigger))
self.authErrors = 0
self.connected = True
def __on_revoke(self, consumer, partitions):
- topicPartition = partitions[0]
- logging.info('[{}] Partition assignment has been revoked. topic: {},
partition: {}, offset: {}'.format(self.trigger, topicPartition.topic,
topicPartition.partition, topicPartition.offset))
+ logging.info('[{}] Partition assignment has been revoked. Disconnected
from broker(s)'.format(self.trigger))
self.connected = False
--
To stop receiving notification emails like this one, please contact
[email protected].