This is an automated email from the ASF dual-hosted git repository.
japetrsn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 9ea0c7e Ensure proper encoding for improper encoded keys (#353)
9ea0c7e is described below
commit 9ea0c7ed8ec1ee6382e1a93baa0ae009693a7ded
Author: James Dubee <[email protected]>
AuthorDate: Tue Oct 15 13:16:06 2019 -0400
Ensure proper encoding for improper encoded keys (#353)
* Ensure proper encoding for invalid JSON messages
* Fix formatting
---
provider/consumer.py | 18 ++++++++++++------
1 file changed, 12 insertions(+), 6 deletions(-)
diff --git a/provider/consumer.py b/provider/consumer.py
index 5c1de24..74b243a 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -485,8 +485,7 @@ class ConsumerProcess (Process):
return offsets
- def __encodeMessageIfNeeded(self, value):
- # let's make sure whatever data we're getting is utf-8 encoded
+ def __getUTF8Encoding(self, value):
try:
value.decode('utf-8')
except UnicodeDecodeError:
@@ -497,8 +496,13 @@ class ConsumerProcess (Process):
logging.warn('[{}] Value contains non-unicode bytes. Replacing
invalid bytes.'.format(self.trigger))
value = unicode(value, errors='replace').encode('utf-8')
except AttributeError:
- logging.warn('[{}] Cannot decode a NoneType message
value'.format(self.trigger))
- return value
+ logging.warn('[{}] Cannot decode a NoneType message
value'.format(self.trigger))
+
+ return value
+
+
+ def __encodeMessageIfNeeded(self, value):
+ value = self.__getUTF8Encoding(value)
if self.encodeValueAsJSON:
try:
@@ -506,10 +510,10 @@ class ConsumerProcess (Process):
logging.debug('[{}] Successfully encoded a message as
JSON.'.format(self.trigger))
return parsed
except ValueError as e:
- # no big deal, just return the original value
+ # message is not a JSON object, return the message as a JSON
value
logging.warn('[{}] I was asked to encode a message as JSON,
but I failed with "{}".'.format(self.trigger, e))
value = "\"{}\"".format(value)
- pass
+ return value
elif self.encodeValueAsBase64:
try:
parsed = value.encode("base64").strip()
@@ -532,6 +536,8 @@ class ConsumerProcess (Process):
logging.warn('[{}] Unable to encode a binary
key.'.format(self.trigger))
pass
+ key = self.__getUTF8Encoding(key)
+
logging.debug('[{}] Returning un-encoded message'.format(self.trigger))
return key