Author: rohini Date: Mon Jun 16 02:59:22 2025 New Revision: 1926452 URL: http://svn.apache.org/viewvc?rev=1926452&view=rev Log: PIG-5410: Support Python 3 for streaming_python (vnarayanan7 via rohini)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/python/streaming/controller.py pig/trunk/test/e2e/pig/udfs/cpython/scriptingudf.py pig/trunk/test/e2e/pig/udfs/python/scriptingudf.py pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java pig/trunk/test/python/streaming/test_controller.py Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1926452&r1=1926451&r2=1926452&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Mon Jun 16 02:59:22 2025 @@ -37,6 +37,8 @@ PIG-5282: Upgrade to Java 8 (satishsaley IMPROVEMENTS +PIG-5410: Support Python 3 for streaming_python (vnarayanan7 via rohini) + PIG-5470: Make owasp data dependency location configurable (rohini) PIG-5419: Upgrade Joda time version (vnarayanan7 via rohini) Modified: pig/trunk/src/python/streaming/controller.py URL: http://svn.apache.org/viewvc/pig/trunk/src/python/streaming/controller.py?rev=1926452&r1=1926451&r2=1926452&view=diff ============================================================================== --- pig/trunk/src/python/streaming/controller.py (original) +++ pig/trunk/src/python/streaming/controller.py Mon Jun 16 02:59:22 2025 @@ -12,19 +12,19 @@ except ImportError: from pig_util import write_user_exception, udf_logging -FIELD_DELIMITER = ',' -TUPLE_START = '(' -TUPLE_END = ')' -BAG_START = '{' -BAG_END = '}' -MAP_START = '[' -MAP_END = ']' -MAP_KEY = '#' -PARAMETER_DELIMITER = '\t' -PRE_WRAP_DELIM = '|' -POST_WRAP_DELIM = '_' -NULL_BYTE = "-" -END_RECORD_DELIM = '|_\n' +FIELD_DELIMITER = b',' +TUPLE_START = b'(' +TUPLE_END = b')' +BAG_START = b'{' +BAG_END = b'}' +MAP_START = b'[' +MAP_END = b']' +MAP_KEY = b'#' +PARAMETER_DELIMITER = b'\t' +PRE_WRAP_DELIM = b'|' +POST_WRAP_DELIM = b'_' +NULL_BYTE = b"-" +END_RECORD_DELIM = b'|_\n' END_RECORD_DELIM_LENGTH = len(END_RECORD_DELIM) WRAPPED_FIELD_DELIMITER = PRE_WRAP_DELIM + FIELD_DELIMITER + POST_WRAP_DELIM @@ -41,21 +41,39 @@ TYPE_TUPLE = TUPLE_START TYPE_BAG = BAG_START TYPE_MAP = MAP_START -TYPE_BOOLEAN = "B" -TYPE_INTEGER = "I" -TYPE_LONG = "L" -TYPE_FLOAT = "F" -TYPE_DOUBLE = "D" -TYPE_BYTEARRAY = "A" -TYPE_CHARARRAY = "C" -TYPE_DATETIME = "T" -TYPE_BIGINTEGER = "N" -TYPE_BIGDECIMAL = "E" +TYPE_BOOLEAN = b"B" +TYPE_INTEGER = b"I" +TYPE_LONG = b"L" +TYPE_FLOAT = b"F" +TYPE_DOUBLE = b"D" +TYPE_BYTEARRAY = b"A" +TYPE_CHARARRAY = b"C" +TYPE_DATETIME = b"T" +TYPE_BIGINTEGER = b"N" +TYPE_BIGDECIMAL = b"E" +END_OF_STREAM = TYPE_CHARARRAY + b"\x04" + END_RECORD_DELIM +TURN_ON_OUTPUT_CAPTURING = TYPE_CHARARRAY + b"TURN_ON_OUTPUT_CAPTURING" + END_RECORD_DELIM -END_OF_STREAM = TYPE_CHARARRAY + "\x04" + END_RECORD_DELIM -TURN_ON_OUTPUT_CAPTURING = TYPE_CHARARRAY + "TURN_ON_OUTPUT_CAPTURING" + END_RECORD_DELIM NUM_LINES_OFFSET_TRACE = int(os.environ.get('PYTHON_TRACE_OFFSET', 0)) +# Deal with Python 2.x to 3.x incompatibilities +if sys.version_info[0] >= 3: + PYTHON3 = True + long = int + #https://bugs.python.org/issue15610 + IMPORT_LEVEL = 0 + # Python 3 handles str as unicode by default. unicode type is dropped + def tounicode(input, encoding="utf-8"): + if type(input) == str: + return input + else: + return str(input) +else: + PYTHON3 = False + IMPORT_LEVEL = -1 + def tounicode(input, encoding=None): + return unicode(input) if encoding is None else unicode(input, encoding) + class PythonStreamingController: def __init__(self, profiling_mode=False): self.profiling_mode = profiling_mode @@ -85,10 +103,10 @@ class PythonStreamingController: logging.basicConfig(filename=log_file_name, format="%(asctime)s %(levelname)s %(message)s", level=udf_logging.udf_log_level) logging.info("To reduce the amount of information being logged only a small subset of rows are logged at the INFO level. Call udf_logging.set_log_level_debug in pig_util to see all rows being processed.") - input_str = self.get_next_input() + input_bytes = self.get_next_input() try: - func = __import__(module_name, globals(), locals(), [func_name], -1).__dict__[func_name] + func = __import__(module_name, globals(), locals(), [func_name], IMPORT_LEVEL).__dict__[func_name] except: #These errors should always be caused by user code. write_user_exception(module_name, self.stream_error, NUM_LINES_OFFSET_TRACE) @@ -100,7 +118,7 @@ class PythonStreamingController: else: sys.stdout = self.output_stream - while input_str != END_OF_STREAM: + while input_bytes != END_OF_STREAM: should_log = False if self.input_count == self.next_input_count_to_log: should_log = True @@ -113,10 +131,10 @@ class PythonStreamingController: try: try: if should_log: - log_message("Row %s: Serialized Input: %s" % (self.input_count, input_str)) - inputs = deserialize_input(input_str) + log_message("Row %s: Serialized Input: %s" % (self.input_count, input_bytes)) + inputs = deserialize_input(input_bytes) if should_log: - log_message("Row %s: Deserialized Input: %s" % (self.input_count, unicode(inputs))) + log_message("Row %s: Deserialized Input: %s" % (self.input_count, tounicode(inputs))) except: #Capture errors where the user passes in bad data. write_user_exception(module_name, self.stream_error, NUM_LINES_OFFSET_TRACE) @@ -126,7 +144,7 @@ class PythonStreamingController: func_output = func(*inputs) if should_log: try: - log_message("Row %s: UDF Output: %s" % (self.input_count, unicode(func_output))) + log_message("Row %s: UDF Output: %s" % (self.input_count, tounicode(func_output))) except: #This is probably an error with unicoding the output. Calling unicode on bytearray will #throw an exception. Since its just a log statement, just skip and carry on. @@ -140,7 +158,7 @@ class PythonStreamingController: if should_log: log_message("Row %s: Serialized Output: %s" % (self.input_count, output)) - self.stream_output.write( "%s%s" % (output, END_RECORD_DELIM) ) + self.stream_output.write( b"%s%s" % (output, END_RECORD_DELIM) ) except Exception as e: #This should only catch internal exceptions with the controller #and pig- not with user code. @@ -153,35 +171,35 @@ class PythonStreamingController: self.stream_output.flush() self.stream_error.flush() - input_str = self.get_next_input() + input_bytes = self.get_next_input() def get_next_input(self): input_stream = self.input_stream output_stream = self.output_stream - input_str = input_stream.readline() + input_bytes = input_stream.readline() - while input_str.endswith(END_RECORD_DELIM) == False: + while input_bytes.endswith(END_RECORD_DELIM) == False: line = input_stream.readline() if line == '': - input_str = '' + input_bytes = '' break - input_str += line + input_bytes += line - if input_str == '': + if input_bytes == '': return END_OF_STREAM - if input_str == TURN_ON_OUTPUT_CAPTURING: + if input_bytes == TURN_ON_OUTPUT_CAPTURING: logging.debug("Turned on Output Capturing") sys.stdout = output_stream return self.get_next_input() - if input_str == END_OF_STREAM: - return input_str + if input_bytes == END_OF_STREAM: + return input_bytes self.input_count += 1 - return input_str[:-END_RECORD_DELIM_LENGTH] + return input_bytes[:-END_RECORD_DELIM_LENGTH] def update_next_input_count_to_log(self): """ @@ -197,63 +215,76 @@ class PythonStreamingController: def close_controller(self, exit_code): sys.stderr.close() - self.stream_error.write("\n") + self.stream_error.write(b"\n") self.stream_error.close() sys.stdout.close() - self.stream_output.write("\n") + self.stream_output.write(b"\n") self.stream_output.close() sys.exit(exit_code) -def deserialize_input(input_str): - if len(input_str) == 0: +def deserialize_input(input_bytes): + if len(input_bytes) == 0: return [] - return [_deserialize_input(param, 0, len(param)-1) for param in input_str.split(WRAPPED_PARAMETER_DELIMITER)] + return [_deserialize_input(param, 0, len(param)-1) for param in input_bytes.split(WRAPPED_PARAMETER_DELIMITER)] -def _deserialize_input(input_str, si, ei): +def _deserialize_input(input_bytes, si, ei): if ei - si < 1: #Handle all of the cases where you can have valid empty input. if ei == si: - if input_str[si] == TYPE_CHARARRAY: - return u"" - elif input_str[si] == TYPE_BYTEARRAY: - return bytearray("") + if input_bytes[si:si+1] == TYPE_CHARARRAY: + if PYTHON3 == True: + return "" + else: + return u"" + elif input_bytes[si:si+1] == TYPE_BYTEARRAY: + return bytearray(b"") else: - raise Exception("Got input type flag %s, but no data to go with it.\nInput string: %s\nSlice: %s" % (input_str[si], input_str, input_str[si:ei+1])) + raise Exception("Got input type flag %s, but no data to go with it.\nInput string: %s\nSlice: %s" % (input_bytes[si], input_bytes, input_bytes[si:ei+1])) else: - raise Exception("Start index %d greater than end index %d.\nInput string: %s\n, Slice: %s" % (si, ei, input_str[si:ei+1])) + raise Exception("Start index %d greater than end index %d.\nInput string: %s\n, Slice: %s" % (si, ei, input_bytes[si:ei+1])) - first = input_str[si] - schema = input_str[si+1] if first == PRE_WRAP_DELIM else first + first = input_bytes[si:si+1] + schema = input_bytes[si+1:si+2] if first == PRE_WRAP_DELIM else first + # Pig to streaming input is serialized in PigStreaming.serializeToBytes() via StorageUtil.putField() if schema == NULL_BYTE: return None elif schema == TYPE_TUPLE or schema == TYPE_MAP or schema == TYPE_BAG: - return _deserialize_collection(input_str, schema, si+3, ei-3) + return _deserialize_collection(input_bytes, schema, si+3, ei-3) elif schema == TYPE_CHARARRAY: - return unicode(input_str[si+1:ei+1], 'utf-8') + if PYTHON3 == True: + return input_bytes[si+1:ei+1].decode("utf-8") + else: + return tounicode(input_bytes[si+1:ei+1], 'utf-8') elif schema == TYPE_BYTEARRAY: - return bytearray(input_str[si+1:ei+1]) + return bytearray(input_bytes[si+1:ei+1]) elif schema == TYPE_INTEGER: - return int(input_str[si+1:ei+1]) + return int(input_bytes[si+1:ei+1]) elif schema == TYPE_LONG or schema == TYPE_BIGINTEGER: - return long(input_str[si+1:ei+1]) + return long(input_bytes[si + 1:ei + 1]) elif schema == TYPE_FLOAT or schema == TYPE_DOUBLE or schema == TYPE_BIGDECIMAL: - return float(input_str[si+1:ei+1]) + return float(input_bytes[si+1:ei+1]) elif schema == TYPE_BOOLEAN: - return input_str[si+1:ei+1] == "true" + return input_bytes[si+1:ei+1] == b"true" elif schema == TYPE_DATETIME: #Format is "yyyy-MM-ddTHH:mm:ss.SSS+00:00" or "2013-08-23T18:14:03.123+ZZ" if USE_DATEUTIL: - return parser.parse(input_str[si+1:ei+1]) + if PYTHON3 == True: + return parser.parse(input_bytes[si+1:ei+1].decode('utf-8')) + else: + return parser.parse(input_bytes[si+1:ei+1]) else: #Try to use datetime even though it doesn't handle time zones properly, #We only use the first 3 microsecond digits and drop time zone (first 23 characters) - return datetime.strptime(input_str[si+1:si+24], "%Y-%m-%dT%H:%M:%S.%f") + if PYTHON3 == True: + return datetime.strptime(input_bytes[si+1:si+24].decode('utf-8'), "%Y-%m-%dT%H:%M:%S.%f") + else: + return datetime.strptime(input_bytes[si+1:si+24], "%Y-%m-%dT%H:%M:%S.%f") else: - raise Exception("Can't determine type of input: %s" % input_str[si:ei+1]) + raise Exception("Can't determine type of input: %s" % input_bytes[si:ei+1]) -def _deserialize_collection(input_str, return_type, si, ei): +def _deserialize_collection(input_bytes, return_type, si, ei): list_result = [] append_to_list_result = list_result.append dict_result = {} @@ -269,24 +300,27 @@ def _deserialize_collection(input_str, r while True: if index >= ei - 2: if return_type == TYPE_MAP: - dict_result[key] = _deserialize_input(input_str, value_start, ei) + dict_result[key] = _deserialize_input(input_bytes, value_start, ei) else: - append_to_list_result(_deserialize_input(input_str, field_start, ei)) + append_to_list_result(_deserialize_input(input_bytes, field_start, ei)) break if return_type == TYPE_MAP and not key: - key_index = input_str.find(MAP_KEY, index) - key = unicode(input_str[index+1:key_index], 'utf-8') + key_index = input_bytes.find(MAP_KEY, index) + if PYTHON3 == True: + key = input_bytes[index+1:key_index].decode("utf-8") + else: + key = tounicode(input_bytes[index+1:key_index], 'utf-8') index = key_index + 1 value_start = key_index + 1 continue - if not (input_str[index] == PRE_WRAP_DELIM and input_str[index+2] == POST_WRAP_DELIM): - prewrap_index = input_str.find(PRE_WRAP_DELIM, index+1) + if not (input_bytes[index:index+1] == PRE_WRAP_DELIM and input_bytes[index+2:index+3] == POST_WRAP_DELIM): + prewrap_index = input_bytes.find(PRE_WRAP_DELIM, index+1) index = (prewrap_index if prewrap_index != -1 else end_index) continue - mid = input_str[index+1] + mid = input_bytes[index+1:index+2] if mid == BAG_START or mid == TUPLE_START or mid == MAP_START: depth += 1 @@ -294,10 +328,10 @@ def _deserialize_collection(input_str, r depth -= 1 elif depth == 0 and mid == FIELD_DELIMITER: if return_type == TYPE_MAP: - dict_result[key] = _deserialize_input(input_str, value_start, index - 1) + dict_result[key] = _deserialize_input(input_bytes, value_start, index - 1) key = None else: - append_to_list_result(_deserialize_input(input_str, field_start, index - 1)) + append_to_list_result(_deserialize_input(input_bytes, field_start, index - 1)) field_start = index + 3 index += 3 @@ -315,6 +349,12 @@ def wrap_tuple(o, serialized_item): else: return serialized_item +def encode_map_key(key): + if PYTHON3 == True and type(key) == bytes: + return bytes + else: + return key.encode('utf-8') + def serialize_output(output, utfEncodeAllFields=False): """ @param utfEncodeStrings - Generally we want to utf encode only strings. But for @@ -335,20 +375,35 @@ def serialize_output(output, utfEncodeAl WRAPPED_FIELD_DELIMITER.join([wrap_tuple(o, serialize_output(o, utfEncodeAllFields)) for o in output]) + WRAPPED_BAG_END) elif output_type == dict: + if PYTHON3 == True: + items = output.items() + else: + items = output.iteritems() return (WRAPPED_MAP_START + - WRAPPED_FIELD_DELIMITER.join(['%s%s%s' % (k.encode('utf-8'), MAP_KEY, serialize_output(v, True)) for k, v in output.iteritems()]) + - WRAPPED_MAP_END) + WRAPPED_FIELD_DELIMITER.join([b'%s%s%s' % (encode_map_key(k), MAP_KEY, serialize_output(v, True)) for k, v in items]) + + WRAPPED_MAP_END) elif output_type == bool: - return ("true" if output else "false") - elif output_type == bytearray: - return str(output) + return (b"true" if output else b"false") + elif output_type == bytearray or (PYTHON3 == True and output_type == bytes): + if PYTHON3 == True: + return bytes(output) if output_type == bytearray else output + else: + return str(output) elif output_type == datetime: - return output.isoformat() - elif utfEncodeAllFields or output_type == str or output_type == unicode: + if PYTHON3 == True: + return bytes(output.isoformat(), 'utf-8') + else: + return output.isoformat() + elif PYTHON3 == True and output_type == str: + return output.encode('utf-8') + elif PYTHON3 == False and (utfEncodeAllFields or output_type == str or output_type == unicode): #unicode is necessary in cases where we're encoding non-strings. - return unicode(output).encode('utf-8') + return tounicode(output).encode('utf-8') else: - return str(output) + if PYTHON3 == True: + return str(output).encode("utf-8") + else: + return str(output) if __name__ == '__main__': controller = PythonStreamingController() Modified: pig/trunk/test/e2e/pig/udfs/cpython/scriptingudf.py URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/udfs/cpython/scriptingudf.py?rev=1926452&r1=1926451&r2=1926452&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/udfs/cpython/scriptingudf.py (original) +++ pig/trunk/test/e2e/pig/udfs/cpython/scriptingudf.py Mon Jun 16 02:59:22 2025 @@ -46,7 +46,7 @@ def complexTypes(m, t, b): if m == None: outm = None else: - for k, v in m.iteritems(): + for k, v in iter(m.items()): outm[k] = len(v) outb = [] Modified: pig/trunk/test/e2e/pig/udfs/python/scriptingudf.py URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/udfs/python/scriptingudf.py?rev=1926452&r1=1926451&r2=1926452&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/udfs/python/scriptingudf.py (original) +++ pig/trunk/test/e2e/pig/udfs/python/scriptingudf.py Mon Jun 16 02:59:22 2025 @@ -46,7 +46,7 @@ def complexTypes(m, t, b): if m == None: outm = None else: - for k, v in m.iteritems(): + for k, v in iter(m.items()): outm[k] = len(v) outb = [] @@ -93,6 +93,6 @@ def isretired(age): else: return False -outputSchema("words:{(word:chararray)}") +@outputSchema("words:{(word:chararray)}") def tokenize(sentence): return stringutil.tokenize(sentence) Modified: pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java?rev=1926452&r1=1926451&r2=1926452&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java (original) +++ pig/trunk/test/org/apache/pig/impl/builtin/TestStreamingUDF.java Mon Jun 16 02:59:22 2025 @@ -25,7 +25,10 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.util.Iterator; import java.util.List; +import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.pig.PigServer; import org.apache.pig.builtin.mock.Storage.Data; import org.apache.pig.data.DataBag; @@ -56,6 +59,7 @@ import org.junit.runner.RunWith; "testPythonUDF_withNewline" }) public class TestStreamingUDF { + private static final Log LOG = LogFactory.getLog(TestStreamingUDF.class); private static PigServer pigServerLocal = null; private static PigServer pigServerMapReduce = null; @@ -83,7 +87,7 @@ public class TestStreamingUDF { String[] pythonScript = { "from pig_util import outputSchema", - "@outputSchema(\'c:chararray\')", + "@outputSchema('c:chararray')", "def py_func(one,two):", " return one + two" }; @@ -118,7 +122,6 @@ public class TestStreamingUDF { public void testPythonUDF_withBytearrayAndBytes_onCluster() throws Exception { pigServerMapReduce = new PigServer(cluster.getExecType(), cluster.getProperties()); - String[] pythonScript = { "from pig_util import outputSchema", "import os", @@ -310,17 +313,18 @@ public class TestStreamingUDF { String[] pythonScript = { "# -*- coding: utf-8 -*-", + "from __future__ import print_function", "from pig_util import outputSchema", "import sys", "", "@outputSchema('tuple_output:tuple(nully:chararray, inty:int, longy:long, floaty:float, doubly:double, chararrayy:chararray, utf_chararray_basic_string:chararray, utf_chararray_unicode:chararray, bytearrayy:bytearray)')", "def get_tuple_output():", - " result = (None, 32, 1000000099990000L, 32.0, 3200.1234678509, 'Some String', 'Hello\\u2026Hello', u'Hello\\u2026Hello', b'Some Byte Array')", + " result = (None, 32, 1000000099990000, 32.0, 3200.1234678509, 'Some String', b'Hello\\u2026Hello', u'Hello\\u2026Hello', b'Some Byte Array')", " return result", "", "@outputSchema('tuple_output:tuple(nully:chararray, inty:int, longy:long, floaty:float, doubly:double, chararrayy:chararray, utf_chararray_basic_string:chararray, utf_chararray_unicode:chararray, bytearrayy:bytearray)')", "def crazy_tuple_identity(ct):", - " print ct", + " print(ct)", " return ct", "", "@outputSchema('mappy:map[]')", @@ -332,7 +336,7 @@ public class TestStreamingUDF { "", "@outputSchema('silly:chararray')", "def silly(silly_word):", - " print silly_word.encode('utf-8')", + " print(silly_word.encode('utf-8'))", " return silly_word", "", "", @@ -380,8 +384,13 @@ public class TestStreamingUDF { //Get First Bag Tuple Tuple innerTuple = bag.iterator().next(); assertEquals(5, innerTuple.size()); - - //Check one field in innermost tuple + + LOG.info("tuple: " + innerTuple); + //Check fields in innermost tuple assertEquals("Hello\\u2026Hello", ((Tuple) innerTuple.get(2)).get(6)); + assertEquals("Hello\u2026Hello", ((Tuple) innerTuple.get(2)).get(7)); + assertTrue(((Map<String, Object>)innerTuple.get(3)).containsKey("Weird\u2026Name")); + assertEquals("Weird \u2026 Value", ((Map<String, Object>)innerTuple.get(3)).get("Weird\u2026Name")); + assertEquals("\u2026", innerTuple.get(4)); } } Modified: pig/trunk/test/python/streaming/test_controller.py URL: http://svn.apache.org/viewvc/pig/trunk/test/python/streaming/test_controller.py?rev=1926452&r1=1926451&r2=1926452&view=diff ============================================================================== --- pig/trunk/test/python/streaming/test_controller.py (original) +++ pig/trunk/test/python/streaming/test_controller.py Mon Jun 16 02:59:22 2025 @@ -1,152 +1,154 @@ import unittest import controller -import StringIO import sys from datetime import datetime class TestDeserializer( unittest.TestCase ): def test__no_params(self): - input = "" + input = b"" expected_output = [] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__chararray(self): - input = "C1234" + input = b"C1234" expected_output = ["1234"] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__empty_chararray(self): - input = "C" + input = b"C" expected_output = [""] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__null_chararray(self): - input = "|-_" + input = b"|-_" expected_output = [None] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__boolean(self): - input = "Btrue" + input = b"Btrue" expected_output = [True] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__datetime_dateutil(self): controller.USE_DATEUTIL = True try: - from dateutil.tz import * + from dateutil.tz import tzutc from dateutil import parser input = "T2008-09-03T20:56:35.450+00:00" expected_output = [ datetime(2008, 9, 3, 20, 56, 35, 450000, tzinfo=tzutc()) ] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) except ImportError: pass def test__datetime_datetime(self): controller.USE_DATEUTIL = False - input = "T2008-09-03T20:56:36.444+00:00" + input = b"T2008-09-03T20:56:36.444+00:00" expected_output = [ datetime(2008, 9, 3, 20, 56, 36, 444000) ] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) controller.USE_DATEUTIL = True def test__two_elements(self): - input = "C032550737A79C543|\t_I970916083725" + input = b"C032550737A79C543|\t_I970916083725" expected_output = [ "032550737A79C543", 970916083725 ] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__three_elements_one_null(self): - input = "C032550737A79C543|\t_I970916083725|\t_|-_" + input = b"C032550737A79C543|\t_I970916083725|\t_|-_" expected_output = [ "032550737A79C543", 970916083725, None ] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__bag(self): - input = "|{_|(_C79C543|,_I9709|,_Crichard keith|)_|,_|(_C79C543|,_I97091|,_Cmicrosoft works|)_|,_|(_C79C543|,_I970|,_Csearch engines|)_|}_" + input = b"|{_|(_C79C543|,_I9709|,_Crichard keith|)_|,_|(_C79C543|,_I97091|,_Cmicrosoft works|)_|,_|(_C79C543|,_I970|,_Csearch engines|)_|}_" expected_output = [ [ ("79C543", 9709, "richard keith"), ("79C543", 97091, "microsoft works"), ("79C543", 970, "search engines") ] ] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__two_elements_bags(self): - input = "|{_|(_C79C543|,_D97091608|,_Crichard keith frazine|)_|,_|(_C79C543|,_D97091609|,_Cmicrosoft works|)_|}_|\t_|{_|(_C79C543|,_D97091608|)_|,_|(_C79C543|,_D9709160|)_|}_" + input = b"|{_|(_C79C543|,_D97091608|,_Crichard keith frazine|)_|,_|(_C79C543|,_D97091609|,_Cmicrosoft works|)_|}_|\t_|{_|(_C79C543|,_D97091608|)_|,_|(_C79C543|,_D9709160|)_|}_" expected_output = [[("79C543",float(97091608),"richard keith frazine"), ("79C543", float(97091609), "microsoft works") ], [("79C543", float(97091608)), ("79C543", float(9709160))]] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__atomic_mixed_with_complex(self): - input = "|{_|(_C79C543|,_L970916083725|,_Crichard keith frazine|)_|,_|(_C79C543|,_L970916095254|,_Cmicrosoft works|)_|}_|\t_I323|\t_|{_|(_C79C543|,_L970916083725|,_Crichard keith frazine|)_|,_|(_C79C543|,_L970916095254|,_Cmicrosoft works|)_|}_" + input = b"|{_|(_C79C543|,_L970916083725|,_Crichard keith frazine|)_|,_|(_C79C543|,_L970916095254|,_Cmicrosoft works|)_|}_|\t_I323|\t_|{_|(_C79C543|,_L970916083725|,_Crichard keith frazine|)_|,_|(_C79C543|,_L970916095254|,_Cmicrosoft works|)_|}_" expected_output = [ [ ("79C543", 970916083725, "richard keith frazine"), ("79C543", 970916095254, "microsoft works") ], 323, [ ("79C543", 970916083725, "richard keith frazine"), ("79C543", 970916095254, "microsoft works") ] ] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__short_tuple(self): - input = "|(_I1|)_" + input = b"|(_I1|)_" expected_output = [ (1,) ] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__nested_tuple(self): - input = "|(_|(_I123|,_Cabc|)_|,_|(_Cdef|,_I456|)_|)_" + input = b"|(_|(_I123|,_Cabc|)_|,_|(_Cdef|,_I456|)_|)_" expected_output = [ ( (123, "abc"), ("def", 456) ) ] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__map(self): - input = "|[_Cname#CJohn|,_Cext#I5555|]_" + input = b"|[_Cname#CJohn|,_Cext#I5555|]_" expected_output = [ {"name":u"John", "ext":5555} ] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__short_field_map(self): - input = "|[_Cn#CJohn|,_Ce#C5555|]_" + input = b"|[_Cn#CJohn|,_Ce#C5555|]_" expected_output = [ {"n":u"John", "e":u"5555"} ] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__complex_map(self): - input = "|[_CD#|[_CA#I1|,_CB#CE|]_|,_CC#CF|]_" + input = b"|[_CD#|[_CA#I1|,_CB#CE|]_|,_CC#CF|]_" expected_output = [ { u"D": { u"A": 1, u"B": u"E" }, u"C": u"F" } ] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__all_types(self): - input = "A123|\t_Btrue|\t_Cabc|\t_D4.0|\t_F5.0|\t_I32|\t_L45" - expected_output = [ b"123", True, u"abc", 4.0, 5.0, 32, 45L ] + input = b"A123|\t_Btrue|\t_Cabc|\t_D4.0|\t_F5.0|\t_I32|\t_L45" + if sys.version_info[0] < 3: + expected_output = [ b"123", True, u"abc", 4.0, 5.0, 32, long(45) ] + else: + expected_output = [ b"123", True, u"abc", 4.0, 5.0, 32, 45 ] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__bad_types(self): - input = "K123" + input = b"K123" self.assertRaises(Exception, controller.deserialize_input, [input]) def test__error(self): self.maxDiff = None - input = "|[_Cspec#|{_|(_|[_Croles#|{_|(_Chadoop-namenode|)_|,_|(_Chadoop-jobtracker|)_|,_|(_Cpig-master|)_|}_|,_Cnum_instances#I1|]_|)_|,_|(_|[_Croles#|{_|(_Chadoop-datanode|)_|,_|(_Chadoop-tasktracker|)_|}_|,_Cnum_instances#I1|]_|)_|}_|,_Chardware#Cm1.large|,_Caccount_id#I1234567|,_Clocation#Cus-east-1b|,_Cstatus#Cdestroyed|,_Cimage#Cus-east-1/ami-1234|,_Cjclouds_name#Cmhcdevelopment_1234|,_Cinstances#|{_|(_|[_Cprivate_address#C10.10.10.10|,_Croles#|{_|(_Chadoop-datanode|)_|,_|(_Chadoop-tasktracker|)_|}_|,_Cpublic_address#Cec2-10-10-10-10.compute-1.amazonaws.com|,_Cinstance_id#Cus-east-1/i-1234|]_|)_|,_|(_|[_Cprivate_address#C10.10.10.10|,_Croles#|{_|(_Chadoop-namenode|)_|,_|(_Chadoop-jobtracker|)_|,_|(_Cpig-master|)_|}_|,_Cpublic_address#Cec2-10-10-10-10.compute-1.amazonaws.com|,_Cinstance_id#Cus-east-1/i-4321|]_|)_|}_|,_Cstop_timestamp#|-_|,_Cplan_code#Cstandard|,_C_id#I1234567890|,_Crunning_timestamp#|-_|,_Cuser_id#I1234|,_Cstart_timestamp#CTue Oct 25 19:26:18 UTC 2011|] _" + input = b"|[_Cspec#|{_|(_|[_Croles#|{_|(_Chadoop-namenode|)_|,_|(_Chadoop-jobtracker|)_|,_|(_Cpig-master|)_|}_|,_Cnum_instances#I1|]_|)_|,_|(_|[_Croles#|{_|(_Chadoop-datanode|)_|,_|(_Chadoop-tasktracker|)_|}_|,_Cnum_instances#I1|]_|)_|}_|,_Chardware#Cm1.large|,_Caccount_id#I1234567|,_Clocation#Cus-east-1b|,_Cstatus#Cdestroyed|,_Cimage#Cus-east-1/ami-1234|,_Cjclouds_name#Cmhcdevelopment_1234|,_Cinstances#|{_|(_|[_Cprivate_address#C10.10.10.10|,_Croles#|{_|(_Chadoop-datanode|)_|,_|(_Chadoop-tasktracker|)_|}_|,_Cpublic_address#Cec2-10-10-10-10.compute-1.amazonaws.com|,_Cinstance_id#Cus-east-1/i-1234|]_|)_|,_|(_|[_Cprivate_address#C10.10.10.10|,_Croles#|{_|(_Chadoop-namenode|)_|,_|(_Chadoop-jobtracker|)_|,_|(_Cpig-master|)_|}_|,_Cpublic_address#Cec2-10-10-10-10.compute-1.amazonaws.com|,_Cinstance_id#Cus-east-1/i-4321|]_|)_|}_|,_Cstop_timestamp#|-_|,_Cplan_code#Cstandard|,_C_id#I1234567890|,_Crunning_timestamp#|-_|,_Cuser_id#I1234|,_Cstart_timestamp#CTue Oct 25 19:26:18 UTC 2011| ]_" expected_output = [ {u'status': u'destroyed', u'start_timestamp': u'Tue Oct 25 19:26:18 UTC 2011', u'user_id': 1234, @@ -189,126 +191,148 @@ class TestDeserializer( unittest.TestCas u'jclouds_name': u'mhcdevelopment_1234', u'stop_timestamp': None}] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__empty_string(self): - input = "C" + input = b"C" expected_output = [ "" ] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__string_with_hash(self): - input = "Cabc#!,|g|,(){}" + input = b"Cabc#!,|g|,(){}" expected_output = [ "abc#!,|g|,(){}" ] out = controller.deserialize_input(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) class TestSerializeOutput( unittest.TestCase ): def test__chararray(self): input = "1234" - expected_output = "1234" + expected_output = b"1234" out = controller.serialize_output(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__empty_chararray(self): input = "" - expected_output = "" + expected_output = b"" out = controller.serialize_output(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__null_chararray(self): input = None - expected_output = "|-_" + expected_output = b"|-_" out = controller.serialize_output(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__num(self): input = 1234 - expected_output = "1234" + expected_output = b"1234" out = controller.serialize_output(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__bool_true(self): input = True - expected_output = "true" + expected_output = b"true" out = controller.serialize_output(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__bool_false(self): input = False - expected_output = "false" + expected_output = b"false" out = controller.serialize_output(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__datetime(self): d = datetime.now() input = (d,) - expected_output = "|(_%s|)_" % d.isoformat() + if sys.version_info[0] >= 3: + expected_output = b"|(_%s|)_" % bytes(d.isoformat(), 'utf-8') + else: + expected_output = "|(_%s|)_" % d.isoformat() out = controller.serialize_output(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__tuple(self): input = (1234, "abc") - expected_output = "|(_1234|,_abc|)_" + expected_output = b"|(_1234|,_abc|)_" out = controller.serialize_output(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__short_tuple(self): input = (1,) - expected_output = "|(_1|)_" + expected_output = b"|(_1|)_" out = controller.serialize_output(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__bag(self): input = [1234, "abc"] - expected_output = "|{_1234|,_abc|}_" + expected_output = b"|{_|(_1234|)_|,_|(_abc|)_|}_" out = controller.serialize_output(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__nested_tuple(self): input = [(32,12,'abc'), 32, ['abc', 'def', 'ghi']] - expected_output = "|{_|(_32|,_12|,_abc|)_|,_32|,_|{_abc|,_def|,_ghi|}_|}_" + expected_output = b"|{_|(_32|,_12|,_abc|)_|,_|(_32|)_|,_|(_|{_|(_abc|)_|,_|(_def|)_|,_|(_ghi|)_|}_|)_|}_" out = controller.serialize_output(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__map(self): input = {'a': 1, 'b':'z'} - expected_output = "|[_a#1|,_b#z|]_" + expected_output = b"|[_a#1|,_b#z|]_" out = controller.serialize_output(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) def test__bug(self): - input = (None, 32, 98765432109876543210L) - expected_output = "|(_|-_|,_32|,_98765432109876543210|)_" + input = (None, 32, 98765432109876543210) + expected_output = b"|(_|-_|,_32|,_98765432109876543210|)_" out = controller.serialize_output(input) - self.assertEquals(expected_output, out) + self.assertEqual(expected_output, out) class TestReadInput( unittest.TestCase ): def test__multiline_record(self): cont = controller.PythonStreamingController() - inputio = StringIO.StringIO() - inputio.write('12\n') - inputio.write('34\n') - inputio.write('5|_\n') + #Handle Python 2 vs 3 differences + #Python 3 controller expects bytes, 2 expects str + if sys.version_info[0] >= 3: + from io import BytesIO + inputio = BytesIO() + inputio.write(b'12\n') + inputio.write(b'34\n') + inputio.write(b'5|_\n') + else: + from StringIO import StringIO + inputio = StringIO() + inputio.write('12\n') + inputio.write('34\n') + inputio.write('5|_\n') inputio.seek(0) cont.input_stream = inputio cont.output_stream = sys.stdout out = cont.get_next_input() - self.assertEquals('12\n34\n5', out) + self.assertEqual(b'12\n34\n5', out) def test__complexmultiline_record(self): cont = controller.PythonStreamingController() - inputio = StringIO.StringIO() - inputio.write('|{_|(_32|,_12|,_a\n') - inputio.write('bc|)_|,_32|,_|{_ab\n') - inputio.write('c|,_def|,_gh\n') - inputio.write('i|}_|}_|_\n') + if sys.version_info[0] >= 3: + from io import BytesIO + inputio = BytesIO() + inputio.write(b'|{_|(_32|,_12|,_a\n') + inputio.write(b'bc|)_|,_32|,_|{_ab\n') + inputio.write(b'c|,_def|,_gh\n') + inputio.write(b'i|}_|}_|_\n') + else: + from StringIO import StringIO + inputio = StringIO() + inputio.write('|{_|(_32|,_12|,_a\n') + inputio.write('bc|)_|,_32|,_|{_ab\n') + inputio.write('c|,_def|,_gh\n') + inputio.write('i|}_|}_|_\n') inputio.seek(0) cont.input_stream = inputio cont.output_stream = sys.stdout out = cont.get_next_input() - self.assertEquals('|{_|(_32|,_12|,_a\nbc|)_|,_32|,_|{_ab\nc|,_def|,_gh\ni|}_|}_', out) + self.assertEqual(b'|{_|(_32|,_12|,_a\nbc|)_|,_32|,_|{_ab\nc|,_def|,_gh\ni|}_|}_', out)