Author: rohini
Date: Mon Jun 16 02:59:22 2025
New Revision: 1926452

URL: http://svn.apache.org/viewvc?rev=1926452&view=rev
Log:
PIG-5410: Support Python 3 for streaming_python (vnarayanan7 via rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/python/streaming/controller.py
    pig/trunk/test/e2e/pig/udfs/cpython/scriptingudf.py
    pig/trunk/test/e2e/pig/udfs/python/scriptingudf.py
    pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java
    pig/trunk/test/python/streaming/test_controller.py

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1926452&r1=1926451&r2=1926452&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jun 16 02:59:22 2025
@@ -37,6 +37,8 @@ PIG-5282: Upgrade to Java 8 (satishsaley
  
 IMPROVEMENTS
 
+PIG-5410: Support Python 3 for streaming_python (vnarayanan7 via rohini)
+
 PIG-5470: Make owasp data dependency location configurable (rohini)
 
 PIG-5419: Upgrade Joda time version (vnarayanan7 via rohini)

Modified: pig/trunk/src/python/streaming/controller.py
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/python/streaming/controller.py?rev=1926452&r1=1926451&r2=1926452&view=diff
==============================================================================
--- pig/trunk/src/python/streaming/controller.py (original)
+++ pig/trunk/src/python/streaming/controller.py Mon Jun 16 02:59:22 2025
@@ -12,19 +12,19 @@ except ImportError:
 
 from pig_util import write_user_exception, udf_logging
 
-FIELD_DELIMITER = ','
-TUPLE_START = '('
-TUPLE_END = ')'
-BAG_START = '{'
-BAG_END = '}'
-MAP_START = '['
-MAP_END = ']'
-MAP_KEY = '#'
-PARAMETER_DELIMITER = '\t'
-PRE_WRAP_DELIM = '|'
-POST_WRAP_DELIM = '_'
-NULL_BYTE = "-"
-END_RECORD_DELIM = '|_\n'
+FIELD_DELIMITER = b','
+TUPLE_START = b'('
+TUPLE_END = b')'
+BAG_START = b'{'
+BAG_END = b'}'
+MAP_START = b'['
+MAP_END = b']'
+MAP_KEY = b'#'
+PARAMETER_DELIMITER = b'\t'
+PRE_WRAP_DELIM = b'|'
+POST_WRAP_DELIM = b'_'
+NULL_BYTE = b"-"
+END_RECORD_DELIM = b'|_\n'
 END_RECORD_DELIM_LENGTH = len(END_RECORD_DELIM)
 
 WRAPPED_FIELD_DELIMITER = PRE_WRAP_DELIM + FIELD_DELIMITER + POST_WRAP_DELIM
@@ -41,21 +41,39 @@ TYPE_TUPLE = TUPLE_START
 TYPE_BAG = BAG_START
 TYPE_MAP = MAP_START
 
-TYPE_BOOLEAN = "B"
-TYPE_INTEGER = "I"
-TYPE_LONG = "L"
-TYPE_FLOAT = "F"
-TYPE_DOUBLE = "D"
-TYPE_BYTEARRAY = "A"
-TYPE_CHARARRAY = "C"
-TYPE_DATETIME = "T"
-TYPE_BIGINTEGER = "N"
-TYPE_BIGDECIMAL = "E"
+TYPE_BOOLEAN = b"B"
+TYPE_INTEGER = b"I"
+TYPE_LONG = b"L"
+TYPE_FLOAT = b"F"
+TYPE_DOUBLE = b"D"
+TYPE_BYTEARRAY = b"A"
+TYPE_CHARARRAY = b"C"
+TYPE_DATETIME = b"T"
+TYPE_BIGINTEGER = b"N"
+TYPE_BIGDECIMAL = b"E"
+END_OF_STREAM = TYPE_CHARARRAY + b"\x04" + END_RECORD_DELIM
+TURN_ON_OUTPUT_CAPTURING = TYPE_CHARARRAY + b"TURN_ON_OUTPUT_CAPTURING" + 
END_RECORD_DELIM
 
-END_OF_STREAM = TYPE_CHARARRAY + "\x04" + END_RECORD_DELIM
-TURN_ON_OUTPUT_CAPTURING = TYPE_CHARARRAY + "TURN_ON_OUTPUT_CAPTURING" + 
END_RECORD_DELIM
 NUM_LINES_OFFSET_TRACE = int(os.environ.get('PYTHON_TRACE_OFFSET', 0))
 
+# Deal with Python 2.x to 3.x incompatibilities
+if sys.version_info[0] >= 3:
+    PYTHON3 = True
+    long = int
+    #https://bugs.python.org/issue15610
+    IMPORT_LEVEL = 0
+    # Python 3 handles str as unicode by default. unicode type is dropped
+    def tounicode(input, encoding="utf-8"):
+        if type(input) == str:
+            return input
+        else:
+            return str(input)
+else:
+    PYTHON3 = False
+    IMPORT_LEVEL = -1
+    def tounicode(input, encoding=None):
+        return unicode(input) if encoding is None else unicode(input, encoding)
+
 class PythonStreamingController:
     def __init__(self, profiling_mode=False):
         self.profiling_mode = profiling_mode
@@ -85,10 +103,10 @@ class PythonStreamingController:
         logging.basicConfig(filename=log_file_name, format="%(asctime)s 
%(levelname)s %(message)s", level=udf_logging.udf_log_level)
         logging.info("To reduce the amount of information being logged only a 
small subset of rows are logged at the INFO level.  Call 
udf_logging.set_log_level_debug in pig_util to see all rows being processed.")
 
-        input_str = self.get_next_input()
+        input_bytes = self.get_next_input()
 
         try:
-            func = __import__(module_name, globals(), locals(), [func_name], 
-1).__dict__[func_name]
+            func = __import__(module_name, globals(), locals(), [func_name], 
IMPORT_LEVEL).__dict__[func_name]
         except:
             #These errors should always be caused by user code.
             write_user_exception(module_name, self.stream_error, 
NUM_LINES_OFFSET_TRACE)
@@ -100,7 +118,7 @@ class PythonStreamingController:
         else:
             sys.stdout = self.output_stream
 
-        while input_str != END_OF_STREAM:
+        while input_bytes != END_OF_STREAM:
             should_log = False
             if self.input_count == self.next_input_count_to_log:
                 should_log = True
@@ -113,10 +131,10 @@ class PythonStreamingController:
             try:
                 try:
                     if should_log:
-                        log_message("Row %s: Serialized Input: %s" % 
(self.input_count, input_str))
-                    inputs = deserialize_input(input_str)
+                        log_message("Row %s: Serialized Input: %s" % 
(self.input_count, input_bytes))
+                    inputs = deserialize_input(input_bytes)
                     if should_log:
-                        log_message("Row %s: Deserialized Input: %s" % 
(self.input_count, unicode(inputs)))
+                        log_message("Row %s: Deserialized Input: %s" % 
(self.input_count, tounicode(inputs)))
                 except:
                     #Capture errors where the user passes in bad data.
                     write_user_exception(module_name, self.stream_error, 
NUM_LINES_OFFSET_TRACE)
@@ -126,7 +144,7 @@ class PythonStreamingController:
                     func_output = func(*inputs)
                     if should_log:
                         try:
-                            log_message("Row %s: UDF Output: %s" % 
(self.input_count, unicode(func_output)))
+                            log_message("Row %s: UDF Output: %s" % 
(self.input_count, tounicode(func_output)))
                         except:
                             #This is probably an error with unicoding the 
output.  Calling unicode on bytearray will
                             #throw an exception.  Since its just a log 
statement, just skip and carry on.
@@ -140,7 +158,7 @@ class PythonStreamingController:
                 if should_log:
                     log_message("Row %s: Serialized Output: %s" % 
(self.input_count, output))
 
-                self.stream_output.write( "%s%s" % (output, END_RECORD_DELIM) )
+                self.stream_output.write( b"%s%s" % (output, END_RECORD_DELIM) 
)
             except Exception as e:
                 #This should only catch internal exceptions with the controller
                 #and pig- not with user code.
@@ -153,35 +171,35 @@ class PythonStreamingController:
             self.stream_output.flush()
             self.stream_error.flush()
 
-            input_str = self.get_next_input()
+            input_bytes = self.get_next_input()
 
     def get_next_input(self):
         input_stream = self.input_stream
         output_stream = self.output_stream
 
-        input_str = input_stream.readline()
+        input_bytes = input_stream.readline()
 
-        while input_str.endswith(END_RECORD_DELIM) == False:
+        while input_bytes.endswith(END_RECORD_DELIM) == False:
             line = input_stream.readline()
             if line == '':
-                input_str = ''
+                input_bytes = ''
                 break
-            input_str += line
+            input_bytes += line
 
-        if input_str == '':
+        if input_bytes == '':
             return END_OF_STREAM
 
-        if input_str == TURN_ON_OUTPUT_CAPTURING:
+        if input_bytes == TURN_ON_OUTPUT_CAPTURING:
             logging.debug("Turned on Output Capturing")
             sys.stdout = output_stream
             return self.get_next_input()
 
-        if input_str == END_OF_STREAM:
-            return input_str
+        if input_bytes == END_OF_STREAM:
+            return input_bytes
 
         self.input_count += 1
 
-        return input_str[:-END_RECORD_DELIM_LENGTH]
+        return input_bytes[:-END_RECORD_DELIM_LENGTH]
 
     def update_next_input_count_to_log(self):
         """
@@ -197,63 +215,76 @@ class PythonStreamingController:
 
     def close_controller(self, exit_code):
         sys.stderr.close()
-        self.stream_error.write("\n")
+        self.stream_error.write(b"\n")
         self.stream_error.close()
         sys.stdout.close()
-        self.stream_output.write("\n")
+        self.stream_output.write(b"\n")
         self.stream_output.close()
         sys.exit(exit_code)
 
-def deserialize_input(input_str):
-    if len(input_str) == 0:
+def deserialize_input(input_bytes):
+    if len(input_bytes) == 0:
         return []
 
-    return [_deserialize_input(param, 0, len(param)-1) for param in 
input_str.split(WRAPPED_PARAMETER_DELIMITER)]
+    return [_deserialize_input(param, 0, len(param)-1) for param in 
input_bytes.split(WRAPPED_PARAMETER_DELIMITER)]
 
-def _deserialize_input(input_str, si, ei):
+def _deserialize_input(input_bytes, si, ei):
     if ei - si < 1:
         #Handle all of the cases where you can have valid empty input.
         if ei == si:
-            if input_str[si] == TYPE_CHARARRAY:
-                return u""
-            elif input_str[si] == TYPE_BYTEARRAY:
-                return bytearray("")
+            if input_bytes[si:si+1] == TYPE_CHARARRAY:
+                if PYTHON3 == True:
+                    return ""
+                else:
+                    return u""
+            elif input_bytes[si:si+1] == TYPE_BYTEARRAY:
+                return bytearray(b"")
             else:
-                raise Exception("Got input type flag %s, but no data to go 
with it.\nInput string: %s\nSlice: %s" % (input_str[si], input_str, 
input_str[si:ei+1]))
+                raise Exception("Got input type flag %s, but no data to go 
with it.\nInput string: %s\nSlice: %s" % (input_bytes[si], input_bytes, 
input_bytes[si:ei+1]))
         else:
-            raise Exception("Start index %d greater than end index %d.\nInput 
string: %s\n, Slice: %s" % (si, ei, input_str[si:ei+1]))
+            raise Exception("Start index %d greater than end index %d.\nInput 
string: %s\n, Slice: %s" % (si, ei, input_bytes[si:ei+1]))
 
-    first = input_str[si]
-    schema = input_str[si+1] if first == PRE_WRAP_DELIM else first
+    first = input_bytes[si:si+1]
+    schema = input_bytes[si+1:si+2] if first == PRE_WRAP_DELIM else first
 
+    # Pig to streaming input is serialized in PigStreaming.serializeToBytes() 
via StorageUtil.putField()
     if schema == NULL_BYTE:
         return None
     elif schema == TYPE_TUPLE or schema == TYPE_MAP or schema == TYPE_BAG:
-        return _deserialize_collection(input_str, schema, si+3, ei-3)
+        return _deserialize_collection(input_bytes, schema, si+3, ei-3)
     elif schema == TYPE_CHARARRAY:
-        return unicode(input_str[si+1:ei+1], 'utf-8')
+        if PYTHON3 == True:
+            return input_bytes[si+1:ei+1].decode("utf-8")
+        else:
+            return tounicode(input_bytes[si+1:ei+1], 'utf-8')
     elif schema == TYPE_BYTEARRAY:
-        return bytearray(input_str[si+1:ei+1])
+        return bytearray(input_bytes[si+1:ei+1])
     elif schema == TYPE_INTEGER:
-        return int(input_str[si+1:ei+1])
+        return int(input_bytes[si+1:ei+1])
     elif schema == TYPE_LONG or schema == TYPE_BIGINTEGER:
-        return long(input_str[si+1:ei+1])
+        return long(input_bytes[si + 1:ei + 1])
     elif schema == TYPE_FLOAT or schema == TYPE_DOUBLE or schema == 
TYPE_BIGDECIMAL:
-        return float(input_str[si+1:ei+1])
+        return float(input_bytes[si+1:ei+1])
     elif schema == TYPE_BOOLEAN:
-        return input_str[si+1:ei+1] == "true"
+        return input_bytes[si+1:ei+1] == b"true"
     elif schema == TYPE_DATETIME:
         #Format is "yyyy-MM-ddTHH:mm:ss.SSS+00:00" or 
"2013-08-23T18:14:03.123+ZZ"
         if USE_DATEUTIL:
-            return parser.parse(input_str[si+1:ei+1])
+            if PYTHON3 == True:
+                return parser.parse(input_bytes[si+1:ei+1].decode('utf-8'))
+            else:
+                return parser.parse(input_bytes[si+1:ei+1])
         else:
             #Try to use datetime even though it doesn't handle time zones 
properly,
             #We only use the first 3 microsecond digits and drop time zone 
(first 23 characters)
-            return datetime.strptime(input_str[si+1:si+24], 
"%Y-%m-%dT%H:%M:%S.%f")
+            if PYTHON3 == True:
+                return 
datetime.strptime(input_bytes[si+1:si+24].decode('utf-8'), 
"%Y-%m-%dT%H:%M:%S.%f")
+            else:
+                return datetime.strptime(input_bytes[si+1:si+24], 
"%Y-%m-%dT%H:%M:%S.%f")
     else:
-        raise Exception("Can't determine type of input: %s" % 
input_str[si:ei+1])
+        raise Exception("Can't determine type of input: %s" % 
input_bytes[si:ei+1])
 
-def _deserialize_collection(input_str, return_type, si, ei):
+def _deserialize_collection(input_bytes, return_type, si, ei):
     list_result = []
     append_to_list_result = list_result.append
     dict_result = {}
@@ -269,24 +300,27 @@ def _deserialize_collection(input_str, r
         while True:
             if index >= ei - 2:
                 if return_type == TYPE_MAP:
-                    dict_result[key] = _deserialize_input(input_str, 
value_start, ei)
+                    dict_result[key] = _deserialize_input(input_bytes, 
value_start, ei)
                 else:
-                    append_to_list_result(_deserialize_input(input_str, 
field_start, ei))
+                    append_to_list_result(_deserialize_input(input_bytes, 
field_start, ei))
                 break
 
             if return_type == TYPE_MAP and not key:
-                key_index = input_str.find(MAP_KEY, index)
-                key = unicode(input_str[index+1:key_index], 'utf-8')
+                key_index = input_bytes.find(MAP_KEY, index)
+                if PYTHON3 == True:
+                    key = input_bytes[index+1:key_index].decode("utf-8")
+                else:
+                    key = tounicode(input_bytes[index+1:key_index], 'utf-8')
                 index = key_index + 1
                 value_start = key_index + 1
                 continue
 
-            if not (input_str[index] == PRE_WRAP_DELIM and input_str[index+2] 
== POST_WRAP_DELIM):
-                prewrap_index = input_str.find(PRE_WRAP_DELIM, index+1)
+            if not (input_bytes[index:index+1] == PRE_WRAP_DELIM and 
input_bytes[index+2:index+3] == POST_WRAP_DELIM):
+                prewrap_index = input_bytes.find(PRE_WRAP_DELIM, index+1)
                 index = (prewrap_index if prewrap_index != -1 else end_index)
                 continue
 
-            mid = input_str[index+1]
+            mid = input_bytes[index+1:index+2]
 
             if mid == BAG_START or mid == TUPLE_START or mid == MAP_START:
                 depth += 1
@@ -294,10 +328,10 @@ def _deserialize_collection(input_str, r
                 depth -= 1
             elif depth == 0 and mid == FIELD_DELIMITER:
                 if return_type == TYPE_MAP:
-                    dict_result[key] = _deserialize_input(input_str, 
value_start, index - 1)
+                    dict_result[key] = _deserialize_input(input_bytes, 
value_start, index - 1)
                     key = None
                 else:
-                    append_to_list_result(_deserialize_input(input_str, 
field_start, index - 1))
+                    append_to_list_result(_deserialize_input(input_bytes, 
field_start, index - 1))
                 field_start = index + 3
 
             index += 3
@@ -315,6 +349,12 @@ def wrap_tuple(o, serialized_item):
     else:
         return serialized_item
 
+def encode_map_key(key):
+    if PYTHON3 == True and type(key) == bytes:
+        return bytes
+    else:
+        return key.encode('utf-8')
+
 def serialize_output(output, utfEncodeAllFields=False):
     """
     @param utfEncodeStrings - Generally we want to utf encode only strings.  
But for
@@ -335,20 +375,35 @@ def serialize_output(output, utfEncodeAl
                 WRAPPED_FIELD_DELIMITER.join([wrap_tuple(o, 
serialize_output(o, utfEncodeAllFields)) for o in output]) +
                 WRAPPED_BAG_END)
     elif output_type == dict:
+        if PYTHON3 == True:
+            items = output.items()
+        else:
+            items = output.iteritems()
         return (WRAPPED_MAP_START +
-                WRAPPED_FIELD_DELIMITER.join(['%s%s%s' % (k.encode('utf-8'), 
MAP_KEY, serialize_output(v, True)) for k, v in output.iteritems()]) +
-                WRAPPED_MAP_END)
+                WRAPPED_FIELD_DELIMITER.join([b'%s%s%s' % (encode_map_key(k), 
MAP_KEY, serialize_output(v, True)) for k, v in items])
+                + WRAPPED_MAP_END)
     elif output_type == bool:
-        return ("true" if output else "false")
-    elif output_type == bytearray:
-        return str(output)
+        return (b"true" if output else b"false")
+    elif output_type == bytearray or (PYTHON3 == True and output_type == 
bytes):
+        if PYTHON3 == True:
+            return bytes(output) if output_type == bytearray else output
+        else:
+            return str(output)
     elif output_type == datetime:
-        return output.isoformat()
-    elif utfEncodeAllFields or output_type == str or output_type == unicode:
+        if PYTHON3 == True:
+            return bytes(output.isoformat(), 'utf-8')
+        else:
+            return output.isoformat()
+    elif PYTHON3 == True and output_type == str:
+        return output.encode('utf-8')
+    elif PYTHON3 == False and (utfEncodeAllFields or output_type == str or 
output_type == unicode):
         #unicode is necessary in cases where we're encoding non-strings.
-        return unicode(output).encode('utf-8')
+        return tounicode(output).encode('utf-8')
     else:
-        return str(output)
+        if PYTHON3 == True:
+            return str(output).encode("utf-8")
+        else:
+            return str(output)
 
 if __name__ == '__main__':
     controller = PythonStreamingController()

Modified: pig/trunk/test/e2e/pig/udfs/cpython/scriptingudf.py
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/udfs/cpython/scriptingudf.py?rev=1926452&r1=1926451&r2=1926452&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/udfs/cpython/scriptingudf.py (original)
+++ pig/trunk/test/e2e/pig/udfs/cpython/scriptingudf.py Mon Jun 16 02:59:22 2025
@@ -46,7 +46,7 @@ def complexTypes(m, t, b):
     if m == None:
         outm = None
     else:
-        for k, v in m.iteritems():
+        for k, v in iter(m.items()):
             outm[k] = len(v)
 
     outb = []

Modified: pig/trunk/test/e2e/pig/udfs/python/scriptingudf.py
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/udfs/python/scriptingudf.py?rev=1926452&r1=1926451&r2=1926452&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/udfs/python/scriptingudf.py (original)
+++ pig/trunk/test/e2e/pig/udfs/python/scriptingudf.py Mon Jun 16 02:59:22 2025
@@ -46,7 +46,7 @@ def complexTypes(m, t, b):
     if m == None:
         outm = None
     else:
-        for k, v in m.iteritems():
+        for k, v in iter(m.items()):
             outm[k] = len(v)
 
     outb = []
@@ -93,6 +93,6 @@ def isretired(age):
     else:
         return False
 
-outputSchema("words:{(word:chararray)}")
+@outputSchema("words:{(word:chararray)}")
 def tokenize(sentence):
     return stringutil.tokenize(sentence)

Modified: pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java?rev=1926452&r1=1926451&r2=1926452&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java (original)
+++ pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java Mon Jun 16 
02:59:22 2025
@@ -25,7 +25,10 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.DataBag;
@@ -56,6 +59,7 @@ import org.junit.runner.RunWith;
     "testPythonUDF_withNewline"
 })
 public class TestStreamingUDF {
+    private static final Log LOG = LogFactory.getLog(TestStreamingUDF.class);
     private static PigServer pigServerLocal = null;
     private static PigServer pigServerMapReduce = null;
 
@@ -83,7 +87,7 @@ public class TestStreamingUDF {
 
         String[] pythonScript = {
                 "from pig_util import outputSchema",
-                "@outputSchema(\'c:chararray\')",
+                "@outputSchema('c:chararray')",
                 "def py_func(one,two):",
                 "   return one + two"
         };
@@ -118,7 +122,6 @@ public class TestStreamingUDF {
     public void testPythonUDF_withBytearrayAndBytes_onCluster() throws 
Exception {
         pigServerMapReduce = new PigServer(cluster.getExecType(), 
cluster.getProperties());
 
-        
         String[] pythonScript = {
             "from pig_util import outputSchema",
             "import os",
@@ -310,17 +313,18 @@ public class TestStreamingUDF {
 
         String[] pythonScript = {
             "# -*- coding: utf-8 -*-",
+            "from __future__ import print_function",
             "from pig_util import outputSchema",
             "import sys",
             "",
             "@outputSchema('tuple_output:tuple(nully:chararray, inty:int, 
longy:long, floaty:float, doubly:double, chararrayy:chararray, 
utf_chararray_basic_string:chararray, utf_chararray_unicode:chararray, 
bytearrayy:bytearray)')",
             "def get_tuple_output():",
-            "    result = (None, 32, 1000000099990000L, 32.0, 3200.1234678509, 
'Some String', 'Hello\\u2026Hello', u'Hello\\u2026Hello', b'Some Byte Array')",
+            "    result = (None, 32, 1000000099990000, 32.0, 3200.1234678509, 
'Some String', b'Hello\\u2026Hello', u'Hello\\u2026Hello', b'Some Byte Array')",
             "    return result",
             "",
             "@outputSchema('tuple_output:tuple(nully:chararray, inty:int, 
longy:long, floaty:float, doubly:double, chararrayy:chararray, 
utf_chararray_basic_string:chararray, utf_chararray_unicode:chararray, 
bytearrayy:bytearray)')",
             "def crazy_tuple_identity(ct):",
-            "    print ct",
+            "    print(ct)",
             "    return ct",
             "",
             "@outputSchema('mappy:map[]')",
@@ -332,7 +336,7 @@ public class TestStreamingUDF {
             "",
             "@outputSchema('silly:chararray')",
             "def silly(silly_word):",
-            "    print silly_word.encode('utf-8')",
+            "    print(silly_word.encode('utf-8'))",
             "    return silly_word",
             "",
             "",
@@ -380,8 +384,13 @@ public class TestStreamingUDF {
         //Get First Bag Tuple
         Tuple innerTuple = bag.iterator().next();
         assertEquals(5, innerTuple.size());
-        
-        //Check one field in innermost tuple
+
+        LOG.info("tuple: " + innerTuple);
+        //Check fields in innermost tuple
         assertEquals("Hello\\u2026Hello", ((Tuple) innerTuple.get(2)).get(6));
+        assertEquals("Hello\u2026Hello", ((Tuple) innerTuple.get(2)).get(7));
+        assertTrue(((Map<String, 
Object>)innerTuple.get(3)).containsKey("Weird\u2026Name"));
+        assertEquals("Weird \u2026 Value", ((Map<String, 
Object>)innerTuple.get(3)).get("Weird\u2026Name"));
+        assertEquals("\u2026", innerTuple.get(4));
     }
 }

Modified: pig/trunk/test/python/streaming/test_controller.py
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/python/streaming/test_controller.py?rev=1926452&r1=1926451&r2=1926452&view=diff
==============================================================================
--- pig/trunk/test/python/streaming/test_controller.py (original)
+++ pig/trunk/test/python/streaming/test_controller.py Mon Jun 16 02:59:22 2025
@@ -1,152 +1,154 @@
 import unittest
 import controller
-import StringIO
 import sys
 
 from datetime import datetime
 
 class TestDeserializer( unittest.TestCase ):
     def test__no_params(self):
-        input = ""
+        input = b""
         expected_output = []
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__chararray(self):
-        input = "C1234"
+        input = b"C1234"
         expected_output = ["1234"]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__empty_chararray(self):
-        input = "C"
+        input = b"C"
         expected_output = [""]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__null_chararray(self):
-        input = "|-_"
+        input = b"|-_"
         expected_output = [None]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__boolean(self):
-        input = "Btrue"
+        input = b"Btrue"
         expected_output = [True]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__datetime_dateutil(self):
         controller.USE_DATEUTIL = True
         try:
-            from dateutil.tz import *
+            from dateutil.tz import tzutc
             from dateutil import parser
             input = "T2008-09-03T20:56:35.450+00:00"
             expected_output = [ datetime(2008, 9, 3, 20, 56, 35, 450000, 
tzinfo=tzutc()) ]
             out = controller.deserialize_input(input)
-            self.assertEquals(expected_output, out)
+            self.assertEqual(expected_output, out)
         except ImportError:
             pass
 
     def test__datetime_datetime(self):
         controller.USE_DATEUTIL = False
 
-        input = "T2008-09-03T20:56:36.444+00:00"
+        input = b"T2008-09-03T20:56:36.444+00:00"
         expected_output = [ datetime(2008, 9, 3, 20, 56, 36, 444000) ]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
         controller.USE_DATEUTIL = True
 
 
     def test__two_elements(self):
-        input = "C032550737A79C543|\t_I970916083725"
+        input = b"C032550737A79C543|\t_I970916083725"
         expected_output = [ "032550737A79C543", 970916083725 ]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__three_elements_one_null(self):
-        input = "C032550737A79C543|\t_I970916083725|\t_|-_"
+        input = b"C032550737A79C543|\t_I970916083725|\t_|-_"
         expected_output = [ "032550737A79C543", 970916083725, None ]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__bag(self):
-        input = "|{_|(_C79C543|,_I9709|,_Crichard 
keith|)_|,_|(_C79C543|,_I97091|,_Cmicrosoft 
works|)_|,_|(_C79C543|,_I970|,_Csearch engines|)_|}_"
+        input = b"|{_|(_C79C543|,_I9709|,_Crichard 
keith|)_|,_|(_C79C543|,_I97091|,_Cmicrosoft 
works|)_|,_|(_C79C543|,_I970|,_Csearch engines|)_|}_"
         expected_output = [ [ ("79C543", 9709, "richard keith"),
                               ("79C543", 97091, "microsoft works"),
                               ("79C543", 970, "search engines") ] ]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__two_elements_bags(self):
-        input = "|{_|(_C79C543|,_D97091608|,_Crichard keith 
frazine|)_|,_|(_C79C543|,_D97091609|,_Cmicrosoft 
works|)_|}_|\t_|{_|(_C79C543|,_D97091608|)_|,_|(_C79C543|,_D9709160|)_|}_"
+        input = b"|{_|(_C79C543|,_D97091608|,_Crichard keith 
frazine|)_|,_|(_C79C543|,_D97091609|,_Cmicrosoft 
works|)_|}_|\t_|{_|(_C79C543|,_D97091608|)_|,_|(_C79C543|,_D9709160|)_|}_"
         expected_output =  [[("79C543",float(97091608),"richard keith 
frazine"),
                              ("79C543", float(97091609), "microsoft works") ],
                             [("79C543", float(97091608)),
                              ("79C543", float(9709160))]]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__atomic_mixed_with_complex(self):
-        input = "|{_|(_C79C543|,_L970916083725|,_Crichard keith 
frazine|)_|,_|(_C79C543|,_L970916095254|,_Cmicrosoft 
works|)_|}_|\t_I323|\t_|{_|(_C79C543|,_L970916083725|,_Crichard keith 
frazine|)_|,_|(_C79C543|,_L970916095254|,_Cmicrosoft works|)_|}_"
+        input = b"|{_|(_C79C543|,_L970916083725|,_Crichard keith 
frazine|)_|,_|(_C79C543|,_L970916095254|,_Cmicrosoft 
works|)_|}_|\t_I323|\t_|{_|(_C79C543|,_L970916083725|,_Crichard keith 
frazine|)_|,_|(_C79C543|,_L970916095254|,_Cmicrosoft works|)_|}_"
         expected_output = [ [ ("79C543", 970916083725, "richard keith 
frazine"),
                               ("79C543", 970916095254, "microsoft works") ],
                             323,
                             [ ("79C543", 970916083725, "richard keith 
frazine"),
                               ("79C543", 970916095254, "microsoft works") ] ]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__short_tuple(self):
-        input = "|(_I1|)_"
+        input = b"|(_I1|)_"
         expected_output = [ (1,) ]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__nested_tuple(self):
-        input = "|(_|(_I123|,_Cabc|)_|,_|(_Cdef|,_I456|)_|)_"
+        input = b"|(_|(_I123|,_Cabc|)_|,_|(_Cdef|,_I456|)_|)_"
         expected_output = [ ( (123, "abc"), ("def", 456) ) ]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__map(self):
-        input = "|[_Cname#CJohn|,_Cext#I5555|]_"
+        input = b"|[_Cname#CJohn|,_Cext#I5555|]_"
         expected_output = [ {"name":u"John",
                              "ext":5555} ]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__short_field_map(self):
-        input = "|[_Cn#CJohn|,_Ce#C5555|]_"
+        input = b"|[_Cn#CJohn|,_Ce#C5555|]_"
         expected_output = [ {"n":u"John",
                              "e":u"5555"} ]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__complex_map(self):
-        input = "|[_CD#|[_CA#I1|,_CB#CE|]_|,_CC#CF|]_"
+        input = b"|[_CD#|[_CA#I1|,_CB#CE|]_|,_CC#CF|]_"
         expected_output = [ { u"D": { u"A": 1,
                                      u"B": u"E"
                                    },
                               u"C": u"F"
                             } ]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__all_types(self):
-        input = "A123|\t_Btrue|\t_Cabc|\t_D4.0|\t_F5.0|\t_I32|\t_L45"
-        expected_output = [ b"123", True, u"abc", 4.0, 5.0, 32, 45L ]
+        input = b"A123|\t_Btrue|\t_Cabc|\t_D4.0|\t_F5.0|\t_I32|\t_L45"
+        if sys.version_info[0] < 3:
+            expected_output = [ b"123", True, u"abc", 4.0, 5.0, 32, long(45) ]
+        else:
+            expected_output = [ b"123", True, u"abc", 4.0, 5.0, 32, 45 ]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__bad_types(self):
-        input = "K123"
+        input = b"K123"
         self.assertRaises(Exception, controller.deserialize_input, [input])
 
     def test__error(self):
         self.maxDiff = None
-        input = 
"|[_Cspec#|{_|(_|[_Croles#|{_|(_Chadoop-namenode|)_|,_|(_Chadoop-jobtracker|)_|,_|(_Cpig-master|)_|}_|,_Cnum_instances#I1|]_|)_|,_|(_|[_Croles#|{_|(_Chadoop-datanode|)_|,_|(_Chadoop-tasktracker|)_|}_|,_Cnum_instances#I1|]_|)_|}_|,_Chardware#Cm1.large|,_Caccount_id#I1234567|,_Clocation#Cus-east-1b|,_Cstatus#Cdestroyed|,_Cimage#Cus-east-1/ami-1234|,_Cjclouds_name#Cmhcdevelopment_1234|,_Cinstances#|{_|(_|[_Cprivate_address#C10.10.10.10|,_Croles#|{_|(_Chadoop-datanode|)_|,_|(_Chadoop-tasktracker|)_|}_|,_Cpublic_address#Cec2-10-10-10-10.compute-1.amazonaws.com|,_Cinstance_id#Cus-east-1/i-1234|]_|)_|,_|(_|[_Cprivate_address#C10.10.10.10|,_Croles#|{_|(_Chadoop-namenode|)_|,_|(_Chadoop-jobtracker|)_|,_|(_Cpig-master|)_|}_|,_Cpublic_address#Cec2-10-10-10-10.compute-1.amazonaws.com|,_Cinstance_id#Cus-east-1/i-4321|]_|)_|}_|,_Cstop_timestamp#|-_|,_Cplan_code#Cstandard|,_C_id#I1234567890|,_Crunning_timestamp#|-_|,_Cuser_id#I1234|,_Cstart_timestamp#CTue
 Oct 25 19:26:18 UTC 2011|]
 _"
+        input = 
b"|[_Cspec#|{_|(_|[_Croles#|{_|(_Chadoop-namenode|)_|,_|(_Chadoop-jobtracker|)_|,_|(_Cpig-master|)_|}_|,_Cnum_instances#I1|]_|)_|,_|(_|[_Croles#|{_|(_Chadoop-datanode|)_|,_|(_Chadoop-tasktracker|)_|}_|,_Cnum_instances#I1|]_|)_|}_|,_Chardware#Cm1.large|,_Caccount_id#I1234567|,_Clocation#Cus-east-1b|,_Cstatus#Cdestroyed|,_Cimage#Cus-east-1/ami-1234|,_Cjclouds_name#Cmhcdevelopment_1234|,_Cinstances#|{_|(_|[_Cprivate_address#C10.10.10.10|,_Croles#|{_|(_Chadoop-datanode|)_|,_|(_Chadoop-tasktracker|)_|}_|,_Cpublic_address#Cec2-10-10-10-10.compute-1.amazonaws.com|,_Cinstance_id#Cus-east-1/i-1234|]_|)_|,_|(_|[_Cprivate_address#C10.10.10.10|,_Croles#|{_|(_Chadoop-namenode|)_|,_|(_Chadoop-jobtracker|)_|,_|(_Cpig-master|)_|}_|,_Cpublic_address#Cec2-10-10-10-10.compute-1.amazonaws.com|,_Cinstance_id#Cus-east-1/i-4321|]_|)_|}_|,_Cstop_timestamp#|-_|,_Cplan_code#Cstandard|,_C_id#I1234567890|,_Crunning_timestamp#|-_|,_Cuser_id#I1234|,_Cstart_timestamp#CTue
 Oct 25 19:26:18 UTC 2011|
 ]_"
         expected_output = [ {u'status': u'destroyed',
                              u'start_timestamp': u'Tue Oct 25 19:26:18 UTC 
2011',
                              u'user_id': 1234,
@@ -189,126 +191,148 @@ class TestDeserializer( unittest.TestCas
                              u'jclouds_name': u'mhcdevelopment_1234',
                              u'stop_timestamp': None}]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__empty_string(self):
-        input = "C"
+        input = b"C"
         expected_output = [ "" ]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__string_with_hash(self):
-        input = "Cabc#!,|g|,(){}"
+        input = b"Cabc#!,|g|,(){}"
         expected_output = [ "abc#!,|g|,(){}" ]
         out = controller.deserialize_input(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
 class TestSerializeOutput( unittest.TestCase ):
     def test__chararray(self):
         input = "1234"
-        expected_output = "1234"
+        expected_output = b"1234"
         out = controller.serialize_output(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__empty_chararray(self):
         input = ""
-        expected_output = ""
+        expected_output = b""
         out = controller.serialize_output(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__null_chararray(self):
         input = None
-        expected_output = "|-_"
+        expected_output = b"|-_"
         out = controller.serialize_output(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__num(self):
         input = 1234
-        expected_output = "1234"
+        expected_output = b"1234"
         out = controller.serialize_output(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__bool_true(self):
         input = True
-        expected_output = "true"
+        expected_output = b"true"
         out = controller.serialize_output(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__bool_false(self):
         input = False
-        expected_output = "false"
+        expected_output = b"false"
         out = controller.serialize_output(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__datetime(self):
         d = datetime.now()
         input = (d,)
-        expected_output = "|(_%s|)_" % d.isoformat()
+        if sys.version_info[0] >= 3:
+            expected_output = b"|(_%s|)_" % bytes(d.isoformat(), 'utf-8')
+        else:
+            expected_output = "|(_%s|)_" % d.isoformat()
         out = controller.serialize_output(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__tuple(self):
         input = (1234, "abc")
-        expected_output = "|(_1234|,_abc|)_"
+        expected_output = b"|(_1234|,_abc|)_"
         out = controller.serialize_output(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__short_tuple(self):
         input = (1,)
-        expected_output = "|(_1|)_"
+        expected_output = b"|(_1|)_"
         out = controller.serialize_output(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__bag(self):
         input = [1234, "abc"]
-        expected_output = "|{_1234|,_abc|}_"
+        expected_output = b"|{_|(_1234|)_|,_|(_abc|)_|}_"
         out = controller.serialize_output(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__nested_tuple(self):
         input = [(32,12,'abc'), 32, ['abc', 'def', 'ghi']]
-        expected_output = 
"|{_|(_32|,_12|,_abc|)_|,_32|,_|{_abc|,_def|,_ghi|}_|}_"
+        expected_output = 
b"|{_|(_32|,_12|,_abc|)_|,_|(_32|)_|,_|(_|{_|(_abc|)_|,_|(_def|)_|,_|(_ghi|)_|}_|)_|}_"
         out = controller.serialize_output(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__map(self):
         input = {'a': 1, 'b':'z'}
-        expected_output = "|[_a#1|,_b#z|]_"
+        expected_output = b"|[_a#1|,_b#z|]_"
         out = controller.serialize_output(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
     def test__bug(self):
-        input = (None, 32, 98765432109876543210L)
-        expected_output = "|(_|-_|,_32|,_98765432109876543210|)_"
+        input = (None, 32, 98765432109876543210)
+        expected_output = b"|(_|-_|,_32|,_98765432109876543210|)_"
         out = controller.serialize_output(input)
-        self.assertEquals(expected_output, out)
+        self.assertEqual(expected_output, out)
 
 class TestReadInput( unittest.TestCase ):
     def test__multiline_record(self):
         cont = controller.PythonStreamingController()
-        inputio = StringIO.StringIO()
-        inputio.write('12\n')
-        inputio.write('34\n')
-        inputio.write('5|_\n')
+        #Handle Python 2 vs 3 differences
+        #Python 3 controller expects bytes, 2 expects str
+        if sys.version_info[0] >= 3:
+            from io import BytesIO
+            inputio = BytesIO()
+            inputio.write(b'12\n')
+            inputio.write(b'34\n')
+            inputio.write(b'5|_\n')
+        else:
+            from StringIO import StringIO
+            inputio = StringIO()
+            inputio.write('12\n')
+            inputio.write('34\n')
+            inputio.write('5|_\n')
         inputio.seek(0)
 
         cont.input_stream = inputio
         cont.output_stream = sys.stdout
         out = cont.get_next_input()
 
-        self.assertEquals('12\n34\n5', out)
+        self.assertEqual(b'12\n34\n5', out)
 
     def test__complexmultiline_record(self):
         cont = controller.PythonStreamingController()
-        inputio = StringIO.StringIO()
-        inputio.write('|{_|(_32|,_12|,_a\n')
-        inputio.write('bc|)_|,_32|,_|{_ab\n')
-        inputio.write('c|,_def|,_gh\n')
-        inputio.write('i|}_|}_|_\n')
+        if sys.version_info[0] >= 3:
+            from io import BytesIO
+            inputio = BytesIO()
+            inputio.write(b'|{_|(_32|,_12|,_a\n')
+            inputio.write(b'bc|)_|,_32|,_|{_ab\n')
+            inputio.write(b'c|,_def|,_gh\n')
+            inputio.write(b'i|}_|}_|_\n')
+        else:
+            from StringIO import StringIO
+            inputio = StringIO()
+            inputio.write('|{_|(_32|,_12|,_a\n')
+            inputio.write('bc|)_|,_32|,_|{_ab\n')
+            inputio.write('c|,_def|,_gh\n')
+            inputio.write('i|}_|}_|_\n')
         inputio.seek(0)
 
         cont.input_stream = inputio
         cont.output_stream = sys.stdout
         out = cont.get_next_input()
 
-        
self.assertEquals('|{_|(_32|,_12|,_a\nbc|)_|,_32|,_|{_ab\nc|,_def|,_gh\ni|}_|}_',
 out)
+        
self.assertEqual(b'|{_|(_32|,_12|,_a\nbc|)_|,_32|,_|{_ab\nc|,_def|,_gh\ni|}_|}_',
 out)



Reply via email to