This is an automated email from the ASF dual-hosted git repository.

japetrsn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ea0c7e  Ensure proper encoding for improper encoded keys (#353)
9ea0c7e is described below

commit 9ea0c7ed8ec1ee6382e1a93baa0ae009693a7ded
Author: James Dubee <[email protected]>
AuthorDate: Tue Oct 15 13:16:06 2019 -0400

    Ensure proper encoding for improper encoded keys (#353)
    
    * Ensure proper encoding for invalid JSON messages
    
    * Fix formatting
---
 provider/consumer.py | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git a/provider/consumer.py b/provider/consumer.py
index 5c1de24..74b243a 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -485,8 +485,7 @@ class ConsumerProcess (Process):
 
         return offsets
 
-    def __encodeMessageIfNeeded(self, value):
-        # let's make sure whatever data we're getting is utf-8 encoded
+    def __getUTF8Encoding(self, value):
         try:
             value.decode('utf-8')
         except UnicodeDecodeError:
@@ -497,8 +496,13 @@ class ConsumerProcess (Process):
                 logging.warn('[{}] Value contains non-unicode bytes. Replacing 
invalid bytes.'.format(self.trigger))
                 value = unicode(value, errors='replace').encode('utf-8')
         except AttributeError:
-           logging.warn('[{}] Cannot decode a NoneType message 
value'.format(self.trigger))
-           return value
+            logging.warn('[{}] Cannot decode a NoneType message 
value'.format(self.trigger))
+
+        return value
+
+
+    def __encodeMessageIfNeeded(self, value):
+        value = self.__getUTF8Encoding(value)
 
         if self.encodeValueAsJSON:
             try:
@@ -506,10 +510,10 @@ class ConsumerProcess (Process):
                 logging.debug('[{}] Successfully encoded a message as 
JSON.'.format(self.trigger))
                 return parsed
             except ValueError as e:
-                # no big deal, just return the original value
+                # message is not a JSON object, return the message as a JSON 
value
                 logging.warn('[{}] I was asked to encode a message as JSON, 
but I failed with "{}".'.format(self.trigger, e))
                 value = "\"{}\"".format(value)
-                pass
+                return value
         elif self.encodeValueAsBase64:
             try:
                 parsed = value.encode("base64").strip()
@@ -532,6 +536,8 @@ class ConsumerProcess (Process):
                 logging.warn('[{}] Unable to encode a binary 
key.'.format(self.trigger))
                 pass
 
+        key = self.__getUTF8Encoding(key)
+
         logging.debug('[{}] Returning un-encoded message'.format(self.trigger))
         return key
 

Reply via email to