This is an automated email from the ASF dual-hosted git repository. rskraba pushed a commit to branch branch-1.9 in repository https://gitbox.apache.org/repos/asf/avro.git
commit e9eee386a14b592a7cca55a1214c14c10d03a3b2 Author: Michael A. Smith <[email protected]> AuthorDate: Tue Oct 22 08:49:44 2019 -0400 AVRO-1788: Remove Obsolete Python < 2.7 Syntax (#683) --- lang/py/scripts/avro | 24 +++++---- lang/py/setup.py | 7 ++- lang/py/src/avro/__init__.py | 10 ++-- lang/py/src/avro/constants.py | 13 +++-- lang/py/src/avro/datafile.py | 2 +- lang/py/src/avro/io.py | 63 ++++++++++++---------- lang/py/src/avro/ipc.py | 27 ++++++---- lang/py/src/avro/protocol.py | 32 +++++------ lang/py/src/avro/schema.py | 31 +++++++---- lang/py/src/avro/tether/__init__.py | 15 +++--- lang/py/src/avro/tether/tether_task.py | 41 +++++++------- lang/py/src/avro/tether/tether_task_runner.py | 61 ++++++++++----------- lang/py/src/avro/tether/util.py | 37 ++++++------- lang/py/src/avro/timezones.py | 9 +++- lang/py/src/avro/tool.py | 45 +++++++++------- lang/py/src/avro/txipc.py | 13 +++-- lang/py/{src/avro => test}/__init__.py | 10 ++-- lang/py/test/av_bench.py | 7 ++- lang/py/test/gen_interop_data.py | 40 +++++++++++--- lang/py/test/mock_tether_parent.py | 36 ++++++------- lang/py/test/sample_http_client.py | 7 ++- lang/py/test/sample_http_server.py | 3 ++ lang/py/test/set_avro_test_path.py | 7 +++ lang/py/test/test_datafile.py | 77 +++++++++++++++------------ lang/py/test/test_datafile_interop.py | 43 +++++++++------ lang/py/test/test_io.py | 59 ++++++++++---------- lang/py/test/test_ipc.py | 7 +++ lang/py/test/test_script.py | 11 ++-- lang/py/test/test_tether_task.py | 39 ++++++++------ lang/py/test/test_tether_task_runner.py | 67 ++++++++++++----------- lang/py/test/test_tether_word_count.py | 15 +++--- lang/py/test/txsample_http_client.py | 5 ++ lang/py/test/txsample_http_server.py | 3 ++ lang/py/test/word_count_task.py | 49 +++++++++-------- 34 files changed, 533 insertions(+), 382 deletions(-) diff --git a/lang/py/scripts/avro b/lang/py/scripts/avro index b320a39..1aac028 100644 --- a/lang/py/scripts/avro +++ b/lang/py/scripts/avro @@ -18,16 +18,19 @@ """Command line utility for reading and writing Avro files.""" -from avro.io import DatumReader, DatumWriter -from avro.datafile import DataFileReader, DataFileWriter -import avro.schema +from __future__ import absolute_import, division, print_function -import json import csv -from sys import stdout, stdin -from itertools import ifilter, imap +import json from functools import partial +from itertools import ifilter, imap from os.path import splitext +from sys import stdin, stdout + +import avro.schema +from avro.datafile import DataFileReader, DataFileWriter +from avro.io import DatumReader, DatumWriter + class AvroError(Exception): pass @@ -52,9 +55,9 @@ def print_csv(row): def select_printer(format): return { - "json" : print_json, - "json-pretty" : print_json_pretty, - "csv" : print_csv + "json": print_json, + "json-pretty": print_json_pretty, + "csv": print_csv }[format] def record_match(expr, record): @@ -102,7 +105,7 @@ def print_avro(avro, opts): def print_schema(avro): schema = avro.meta["avro.schema"] # Pretty print - print json.dumps(json.loads(schema), indent=4) + print(json.dumps(json.loads(schema), indent=4)) def cat(opts, args): if not args: @@ -257,4 +260,3 @@ def main(argv=None): if __name__ == "__main__": main() - diff --git a/lang/py/setup.py b/lang/py/setup.py index c2e07c6..b978092 100755 --- a/lang/py/setup.py +++ b/lang/py/setup.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -7,9 +8,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # https://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,6 +18,8 @@ # limitations under the License. +from __future__ import absolute_import, division, print_function + import distutils.errors import glob import os diff --git a/lang/py/src/avro/__init__.py b/lang/py/src/avro/__init__.py index 47d1295..9a859e9 100644 --- a/lang/py/src/avro/__init__.py +++ b/lang/py/src/avro/__init__.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -5,14 +8,15 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # https://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 'timezones'] +from __future__ import absolute_import, division, print_function +__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 'timezones'] diff --git a/lang/py/src/avro/constants.py b/lang/py/src/avro/constants.py index 66e31df..2197201 100644 --- a/lang/py/src/avro/constants.py +++ b/lang/py/src/avro/constants.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -5,18 +8,18 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # https://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -""" -Contains Constants for Python Avro -""" +"""Contains Constants for Python Avro""" + +from __future__ import absolute_import, division, print_function DATE = "date" DECIMAL = "decimal" diff --git a/lang/py/src/avro/datafile.py b/lang/py/src/avro/datafile.py index 75a8e4a..a9c9c22 100644 --- a/lang/py/src/avro/datafile.py +++ b/lang/py/src/avro/datafile.py @@ -401,4 +401,4 @@ def generate_sixteen_random_bytes(): try: return os.urandom(16) except NotImplementedError: - return [chr(random.randrange(256)) for i in range(16)] + return bytes(random.randrange(256) for i in range(16)) diff --git a/lang/py/src/avro/io.py b/lang/py/src/avro/io.py index c36c5b3..b18b148 100644 --- a/lang/py/src/avro/io.py +++ b/lang/py/src/avro/io.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -38,6 +41,8 @@ uses the following mapping: * Schema booleans are implemented as bool. """ +from __future__ import absolute_import, division, print_function + import datetime import json import struct @@ -183,7 +188,7 @@ class BinaryDecoder(object): def read_boolean(self): """ - a boolean is written as a single byte + a boolean is written as a single byte whose value is either 0 (false) or 1 (true). """ return ord(self.read(1)) == 1 @@ -261,7 +266,7 @@ class BinaryDecoder(object): def read_bytes(self): """ - Bytes are encoded as a long followed by that many bytes of data. + Bytes are encoded as a long followed by that many bytes of data. """ return self.read(self.read_long()) @@ -297,7 +302,7 @@ class BinaryDecoder(object): def read_time_millis_from_int(self): """ - int is decoded as python time object which represents + int is decoded as python time object which represents the number of milliseconds after midnight, 00:00:00.000. """ milliseconds = self.read_int() @@ -305,7 +310,7 @@ class BinaryDecoder(object): def read_time_micros_from_long(self): """ - long is decoded as python time object which represents + long is decoded as python time object which represents the number of microseconds after midnight, 00:00:00.000000. """ microseconds = self.read_long() @@ -313,17 +318,17 @@ class BinaryDecoder(object): def read_timestamp_millis_from_long(self): """ - long is decoded as python datetime object which represents + long is decoded as python datetime object which represents the number of milliseconds from the unix epoch, 1 January 1970. """ timestamp_millis = self.read_long() timedelta = datetime.timedelta(microseconds=timestamp_millis * 1000) - unix_epoch_datetime = datetime.datetime(1970, 1, 1, 0, 0, 0, 0, tzinfo=timezones.utc) + unix_epoch_datetime = datetime.datetime(1970, 1, 1, 0, 0, 0, 0, tzinfo=timezones.utc) return unix_epoch_datetime + timedelta def read_timestamp_micros_from_long(self): """ - long is decoded as python datetime object which represents + long is decoded as python datetime object which represents the number of microseconds from the unix epoch, 1 January 1970. """ timestamp_micros = self.read_long() @@ -386,10 +391,10 @@ class BinaryEncoder(object): null is written as zero bytes """ pass - + def write_boolean(self, datum): """ - a boolean is written as a single byte + a boolean is written as a single byte whose value is either 0 (false) or 1 (true). """ if datum: @@ -399,7 +404,7 @@ class BinaryEncoder(object): def write_int(self, datum): """ - int and long values are written using variable-length, zig-zag coding. + int and long values are written using variable-length, zig-zag coding. """ self.write_long(datum); @@ -491,7 +496,7 @@ class BinaryEncoder(object): bits_to_write = unscaled_datum >> (8 * index) self.write(chr(bits_to_write & 0xff)) else: - for i in range(offset_bits/8): + for i in range(offset_bits // 8): self.write(chr(0)) for index in range(bytes_req-1, -1, -1): bits_to_write = unscaled_datum >> (8 * index) @@ -499,7 +504,7 @@ class BinaryEncoder(object): def write_bytes(self, datum): """ - Bytes are encoded as a long followed by that many bytes of data. + Bytes are encoded as a long followed by that many bytes of data. """ self.write_long(len(datum)) self.write(struct.pack('%ds' % len(datum), datum)) @@ -591,32 +596,32 @@ class DatumReader(object): and w_type == r_type): return True elif (w_type == r_type == 'record' and - DatumReader.check_props(writers_schema, readers_schema, + DatumReader.check_props(writers_schema, readers_schema, ['fullname'])): return True elif (w_type == r_type == 'error' and - DatumReader.check_props(writers_schema, readers_schema, + DatumReader.check_props(writers_schema, readers_schema, ['fullname'])): return True elif (w_type == r_type == 'request'): return True - elif (w_type == r_type == 'fixed' and - DatumReader.check_props(writers_schema, readers_schema, + elif (w_type == r_type == 'fixed' and + DatumReader.check_props(writers_schema, readers_schema, ['fullname', 'size'])): return True - elif (w_type == r_type == 'enum' and - DatumReader.check_props(writers_schema, readers_schema, + elif (w_type == r_type == 'enum' and + DatumReader.check_props(writers_schema, readers_schema, ['fullname'])): return True - elif (w_type == r_type == 'map' and + elif (w_type == r_type == 'map' and DatumReader.check_props(writers_schema.values, readers_schema.values, ['type'])): return True - elif (w_type == r_type == 'array' and + elif (w_type == r_type == 'array' and DatumReader.check_props(writers_schema.items, readers_schema.items, ['type'])): return True - + # Handle schema promotion if w_type == 'int' and r_type in ['long', 'float', 'double']: return True @@ -633,7 +638,7 @@ class DatumReader(object): reader the "reader's schema". """ self._writers_schema = writers_schema - self._readers_schema = readers_schema + self._readers_schema = readers_schema # read/write properties def set_writers_schema(self, writers_schema): @@ -644,7 +649,7 @@ class DatumReader(object): self._readers_schema = readers_schema readers_schema = property(lambda self: self._readers_schema, set_readers_schema) - + def read(self, decoder): if self.readers_schema is None: self.readers_schema = self.writers_schema @@ -682,13 +687,13 @@ class DatumReader(object): else: return decoder.read_int() elif writers_schema.type == 'long': - if (hasattr(writers_schema, 'logical_type') and + if (hasattr(writers_schema, 'logical_type') and writers_schema.logical_type == constants.TIME_MICROS): return decoder.read_time_micros_from_long() - elif (hasattr(writers_schema, 'logical_type') and + elif (hasattr(writers_schema, 'logical_type') and writers_schema.logical_type == constants.TIMESTAMP_MILLIS): return decoder.read_timestamp_millis_from_long() - elif (hasattr(writers_schema, 'logical_type') and + elif (hasattr(writers_schema, 'logical_type') and writers_schema.logical_type == constants.TIMESTAMP_MICROS): return decoder.read_timestamp_micros_from_long() else: @@ -886,7 +891,7 @@ class DatumReader(object): % (index_of_schema, len(writers_schema.schemas)) raise SchemaResolutionException(fail_msg, writers_schema, readers_schema) selected_writers_schema = writers_schema.schemas[index_of_schema] - + # read data return self.read_data(selected_writers_schema, readers_schema, decoder) @@ -914,7 +919,7 @@ class DatumReader(object): * if the reader's record schema has a field that contains a default value, and writer's schema does not have a field with the same name, then the reader should use the default value from its field. - * if the reader's record schema has a field with no default value, and + * if the reader's record schema has a field with no default value, and writer's schema does not have a field with the same name, then the field's value is unset. """ @@ -933,7 +938,7 @@ class DatumReader(object): if len(readers_fields_dict) > len(read_record): writers_fields_dict = writers_schema.fields_dict for field_name, field in readers_fields_dict.items(): - if not writers_fields_dict.has_key(field_name): + if field_name not in writers_fields_dict: if field.has_default: field_val = self._read_default_value(field.type, field.default) read_record[field.name] = field_val diff --git a/lang/py/src/avro/ipc.py b/lang/py/src/avro/ipc.py index 8cbf07b..1bba4f7 100644 --- a/lang/py/src/avro/ipc.py +++ b/lang/py/src/avro/ipc.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -5,17 +8,19 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # https://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -""" -Support for inter-process calls. -""" + +"""Support for inter-process calls.""" + +from __future__ import absolute_import, division, print_function + import httplib from avro import io, protocol, schema @@ -189,7 +194,7 @@ class BaseRequestor(object): * a one-byte error flag boolean, followed by either: o if the error flag is false, the message response, serialized per the message's response schema. - o if the error flag is true, + o if the error flag is true, the error, serialized per the message's error union schema. """ # response metadata @@ -267,11 +272,11 @@ class Responder(object): buffer_encoder = io.BinaryEncoder(buffer_writer) error = None response_metadata = {} - + try: remote_protocol = self.process_handshake(buffer_decoder, buffer_encoder) # handshake failure - if remote_protocol is None: + if remote_protocol is None: return buffer_writer.getvalue() # read request using remote protocol @@ -296,9 +301,9 @@ class Responder(object): # perform server logic try: response = self.invoke(local_message, request) - except AvroRemoteException, e: + except AvroRemoteException as e: error = e - except Exception, e: + except Exception as e: error = AvroRemoteException(str(e)) # write response using local protocol @@ -310,7 +315,7 @@ class Responder(object): else: writers_schema = local_message.errors self.write_error(writers_schema, error, buffer_encoder) - except schema.AvroException, e: + except schema.AvroException as e: error = AvroRemoteException(str(e)) buffer_encoder = io.BinaryEncoder(StringIO()) META_WRITER.write(response_metadata, buffer_encoder) diff --git a/lang/py/src/avro/protocol.py b/lang/py/src/avro/protocol.py index 117401c..9b27a45 100644 --- a/lang/py/src/avro/protocol.py +++ b/lang/py/src/avro/protocol.py @@ -1,6 +1,3 @@ -#!/usr/bin/env python - -## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -21,10 +18,15 @@ from __future__ import absolute_import, division, print_function -import hashlib import json -import avro.schema +from avro import schema + +try: + from hashlib import md5 +except ImportError: + from md5 import md5 + # # Constants @@ -37,7 +39,7 @@ VALID_TYPE_SCHEMA_TYPES = ('enum', 'record', 'error', 'fixed') # Exceptions # -class ProtocolParseException(avro.schema.AvroException): +class ProtocolParseException(schema.AvroException): pass # @@ -49,7 +51,7 @@ class Protocol(object): def _parse_types(self, types, type_names): type_objects = [] for type in types: - type_object = avro.schema.make_avsc_object(type, type_names) + type_object = schema.make_avsc_object(type, type_names) if type_object.type not in VALID_TYPE_SCHEMA_TYPES: fail_msg = 'Type %s not an enum, fixed, record, or error.' % type raise ProtocolParseException(fail_msg) @@ -92,7 +94,7 @@ class Protocol(object): self._props = {} self.set_prop('name', name) - type_names = avro.schema.Names() + type_names = schema.Names() if namespace is not None: self.set_prop('namespace', namespace) type_names.default_namespace = namespace @@ -100,13 +102,13 @@ class Protocol(object): self.set_prop('types', self._parse_types(types, type_names)) if messages is not None: self.set_prop('messages', self._parse_messages(messages, type_names)) - self._md5 = hashlib.md5(str(self)).digest() + self._md5 = md5(str(self)).digest() # read-only properties name = property(lambda self: self.get_prop('name')) namespace = property(lambda self: self.get_prop('namespace')) fullname = property(lambda self: - avro.schema.Name(self.name, self.namespace).fullname) + schema.Name(self.name, self.namespace).fullname) types = property(lambda self: self.get_prop('types')) types_dict = property(lambda self: dict([(type.name, type) for type in self.types])) @@ -123,7 +125,7 @@ class Protocol(object): def to_json(self): to_dump = {} to_dump['protocol'] = self.name - names = avro.schema.Names(default_namespace=self.namespace) + names = schema.Names(default_namespace=self.namespace) if self.namespace: to_dump['namespace'] = self.namespace if self.types: @@ -148,20 +150,20 @@ class Message(object): if not isinstance(request, list): fail_msg = 'Request property not a list: %s' % request raise ProtocolParseException(fail_msg) - return avro.schema.RecordSchema(None, None, request, names, 'request') + return schema.RecordSchema(None, None, request, names, 'request') def _parse_response(self, response, names): if isinstance(response, basestring) and names.has_name(response, None): return names.get_name(response, None) else: - return avro.schema.make_avsc_object(response, names) + return schema.make_avsc_object(response, names) def _parse_errors(self, errors, names): if not isinstance(errors, list): fail_msg = 'Errors property not a list: %s' % errors raise ProtocolParseException(fail_msg) errors_for_parsing = {'type': 'error_union', 'declared_errors': errors} - return avro.schema.make_avsc_object(errors_for_parsing, names) + return schema.make_avsc_object(errors_for_parsing, names) def __init__(self, name, request, response, errors=None, names=None): self._name = name @@ -190,7 +192,7 @@ class Message(object): def to_json(self, names=None): if names is None: - names = avro.schema.Names() + names = schema.Names() to_dump = {} to_dump['request'] = self.request.to_json(names) to_dump['response'] = self.response.to_json(names) diff --git a/lang/py/src/avro/schema.py b/lang/py/src/avro/schema.py index 822d76c..06c1aeb 100644 --- a/lang/py/src/avro/schema.py +++ b/lang/py/src/avro/schema.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -35,7 +38,10 @@ A schema may be one of: Null. """ +from __future__ import absolute_import, division, print_function + import json +import sys from math import floor, log10 from avro import constants @@ -231,11 +237,11 @@ class Names(object): def has_name(self, name_attr, space_attr): test = Name(name_attr, space_attr, self.default_namespace).fullname - return self.names.has_key(test) + return test in self.names def get_name(self, name_attr, space_attr): test = Name(name_attr, space_attr, self.default_namespace).fullname - if not self.names.has_key(test): + if test not in self.names: return None return self.names[test] @@ -270,7 +276,7 @@ class Names(object): if to_add.fullname in VALID_TYPES: fail_msg = '%s is a reserved type name.' % to_add.fullname raise SchemaParseException(fail_msg) - elif self.names.has_key(to_add.fullname): + elif to_add.fullname in self.names: fail_msg = 'The name "%s" is already in use.' % to_add.fullname raise SchemaParseException(fail_msg) @@ -377,7 +383,7 @@ class Field(object): else: try: type_schema = make_avsc_object(type, names) - except Exception, e: + except Exception as e: fail_msg = 'Type property "%s" not a valid Avro schema: %s' % (type, e) raise SchemaParseException(fail_msg) self.set_prop('type', type_schema) @@ -578,7 +584,7 @@ class ArraySchema(Schema): else: try: items_schema = make_avsc_object(items, names) - except SchemaParseException, e: + except SchemaParseException as e: fail_msg = 'Items schema (%s) not a valid Avro schema: %s (known names: %s)' % (items, e, names.names.keys()) raise SchemaParseException(fail_msg) @@ -652,7 +658,7 @@ class UnionSchema(Schema): else: try: new_schema = make_avsc_object(schema, names) - except Exception, e: + except Exception as e: raise SchemaParseException('Union item must be a valid Avro schema: %s' % str(e)) # check the new schema if (new_schema.type in VALID_TYPES and new_schema.type not in NAMED_TYPES @@ -708,7 +714,7 @@ class RecordSchema(NamedSchema): # null values can have a default value of None has_default = False default = None - if field.has_key('default'): + if 'default' in field: has_default = True default = field.get('default') @@ -978,10 +984,13 @@ def parse(json_string): # parse the JSON try: json_data = json.loads(json_string) - except Exception, e: - import sys - raise SchemaParseException('Error parsing JSON: %s, error = %s' - % (json_string, e)), None, sys.exc_info()[2] + except Exception as e: + msg = 'Error parsing JSON: {}, error = {}'.format(json_string, e) + new_exception = SchemaParseException(msg) + traceback = sys.exc_info()[2] + if not hasattr(new_exception, 'with_traceback'): + raise (new_exception, None, traceback) # Python 2 syntax + raise new_exception.with_traceback(traceback) # Initialize the names object names = Names() diff --git a/lang/py/src/avro/tether/__init__.py b/lang/py/src/avro/tether/__init__.py index 0dbd3d8..c60edf9 100644 --- a/lang/py/src/avro/tether/__init__.py +++ b/lang/py/src/avro/tether/__init__.py @@ -1,4 +1,6 @@ -# +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -15,12 +17,9 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# -from .util import * -from .tether_task import * -from .tether_task_runner import * +from __future__ import absolute_import, division, print_function -__all__=util.__all__ -__all__+=tether_task.__all__ -__all__+=tether_task_runner.__all__ +from avro.tether.tether_task import HTTPRequestor, TaskType, TetherTask, inputProtocol, outputProtocol +from avro.tether.tether_task_runner import TaskRunner +from avro.tether.util import find_port diff --git a/lang/py/src/avro/tether/tether_task.py b/lang/py/src/avro/tether/tether_task.py index 23112a7..4e2004d 100644 --- a/lang/py/src/avro/tether/tether_task.py +++ b/lang/py/src/avro/tether/tether_task.py @@ -1,22 +1,23 @@ -""" - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -""" - -__all__=["TetherTask","TaskType","inputProtocol","outputProtocol","HTTPRequestor"] +#!/usr/bin/env python + +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, division, print_function import collections import io as pyio @@ -30,6 +31,8 @@ from StringIO import StringIO from avro import io as avio from avro import ipc, protocol, schema +__all__ = ["TetherTask", "TaskType", "inputProtocol", "outputProtocol", "HTTPRequestor"] + # create protocol objects for the input and output protocols # The build process should copy InputProtocol.avpr and OutputProtocol.avpr # into the same directory as this module diff --git a/lang/py/src/avro/tether/tether_task_runner.py b/lang/py/src/avro/tether/tether_task_runner.py index b248ffd..64bee7b 100644 --- a/lang/py/src/avro/tether/tether_task_runner.py +++ b/lang/py/src/avro/tether/tether_task_runner.py @@ -1,30 +1,23 @@ -""" - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -""" - -__all__=["TaskRunner"] - -if __name__ == "__main__": - # Relative imports don't work when being run directly - from avro import tether - from avro.tether import TetherTask, find_port, inputProtocol - -else: - from . import TetherTask, find_port, inputProtocol +#!/usr/bin/env python + +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, division, print_function import logging import sys @@ -33,12 +26,16 @@ import traceback import weakref from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer +import avro.tether.tether_task +import avro.tether.util from avro import ipc +__all__ = ["TaskRunner"] + class TaskRunnerResponder(ipc.Responder): """ - The responder for the thethered process + The responder for the tethered process """ def __init__(self,runner): """ @@ -46,7 +43,7 @@ class TaskRunnerResponder(ipc.Responder): ---------------------------------------------------------- runner - Instance of TaskRunner """ - ipc.Responder.__init__(self, inputProtocol) + ipc.Responder.__init__(self, avro.tether.tether_task.inputProtocol) self.log=logging.getLogger("TaskRunnerResponder") @@ -148,7 +145,7 @@ class TaskRunner(object): self.log=logging.getLogger("TaskRunner:") - if not(isinstance(task,TetherTask)): + if not(isinstance(task, avro.tether.tether_task.TetherTask)): raise ValueError("task must be an instance of tether task") self.task=task @@ -172,7 +169,7 @@ class TaskRunner(object): testing """ - port=find_port() + port = avro.tether.util.find_port() address=("localhost",port) @@ -212,7 +209,7 @@ if __name__ == '__main__': logging.basicConfig(level=logging.INFO) if (len(sys.argv)<=1): - print "Error: tether_task_runner.__main__: Usage: tether_task_runner task_package.task_module.TaskClass" + print("Error: tether_task_runner.__main__: Usage: tether_task_runner task_package.task_module.TaskClass") raise ValueError("Usage: tether_task_runner task_package.task_module.TaskClass") fullcls=sys.argv[1] diff --git a/lang/py/src/avro/tether/util.py b/lang/py/src/avro/tether/util.py index cbeeef0..3d8ad3a 100644 --- a/lang/py/src/avro/tether/util.py +++ b/lang/py/src/avro/tether/util.py @@ -1,22 +1,23 @@ -""" - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -""" +#!/usr/bin/env python -__all__=["find_port"] +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, division, print_function import socket diff --git a/lang/py/src/avro/timezones.py b/lang/py/src/avro/timezones.py index a4985b4..a306f6d 100644 --- a/lang/py/src/avro/timezones.py +++ b/lang/py/src/avro/timezones.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -5,15 +8,17 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # https://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import, division, print_function + from datetime import datetime, timedelta, tzinfo diff --git a/lang/py/src/avro/tool.py b/lang/py/src/avro/tool.py index 6a92fee..3c0c228 100644 --- a/lang/py/src/avro/tool.py +++ b/lang/py/src/avro/tool.py @@ -1,4 +1,6 @@ -#! /usr/bin/env python +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -6,20 +8,23 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # https://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + """ Command-line tool NOTE: The API for the command-line tool is experimental. """ +from __future__ import absolute_import, division, print_function + import sys import threading import urlparse @@ -37,7 +42,7 @@ class GenericResponder(ipc.Responder): def invoke(self, message, request): if message.name == self.msg: - print >> sys.stderr, "Message: %s Datum: %s" % (message.name, self.datum) + print("Message: %s Datum: %s" % (message.name, self.datum), file=sys.stderr) # server will shut down after processing a single Avro request global server_should_shutdown server_should_shutdown = True @@ -55,7 +60,7 @@ class GenericHandler(BaseHTTPRequestHandler): resp_writer = ipc.FramedWriter(self.wfile) resp_writer.write_framed_message(resp_body) if server_should_shutdown: - print >> sys.stderr, "Shutting down server." + print("Shutting down server.", file=sys.stderr) quitter = threading.Thread(target=self.server.shutdown) quitter.daemon = True quitter.start() @@ -68,10 +73,10 @@ def run_server(uri, proto, msg, datum): server_should_shutdown = False responder = GenericResponder(proto, msg, datum) server = HTTPServer(server_addr, GenericHandler) - print "Port: %s" % server.server_port + print("Port: %s" % server.server_port) sys.stdout.flush() server.allow_reuse_address = True - print >> sys.stderr, "Starting server." + print("Starting server.", file=sys.stderr) server.serve_forever() def send_message(uri, proto, msg, datum): @@ -79,7 +84,7 @@ def send_message(uri, proto, msg, datum): client = ipc.HTTPTransceiver(url_obj.hostname, url_obj.port) proto_json = file(proto, 'r').read() requestor = ipc.Requestor(protocol.parse(proto_json), client) - print requestor.request(msg, datum) + print(requestor.request(msg, datum)) def file_or_stdin(f): if f == "-": @@ -89,20 +94,20 @@ def file_or_stdin(f): def main(args=sys.argv): if len(args) == 1: - print "Usage: %s [dump|rpcreceive|rpcsend]" % args[0] + print("Usage: %s [dump|rpcreceive|rpcsend]" % args[0]) return 1 if args[1] == "dump": if len(args) != 3: - print "Usage: %s dump input_file" % args[0] + print("Usage: %s dump input_file" % args[0]) return 1 for d in datafile.DataFileReader(file_or_stdin(args[2]), io.DatumReader()): - print repr(d) + print(repr(d)) elif args[1] == "rpcreceive": usage_str = "Usage: %s rpcreceive uri protocol_file " % args[0] usage_str += "message_name (-data d | -file f)" if len(args) not in [5, 7]: - print usage_str + print(usage_str) return 1 uri, proto, msg = args[2:5] datum = None @@ -111,19 +116,19 @@ def main(args=sys.argv): reader = open(args[6], 'rb') datum_reader = io.DatumReader() dfr = datafile.DataFileReader(reader, datum_reader) - datum = dfr.next() + datum = next(dfr) elif args[5] == "-data": - print "JSON Decoder not yet implemented." + print("JSON Decoder not yet implemented.") return 1 else: - print usage_str + print(usage_str) return 1 run_server(uri, proto, msg, datum) elif args[1] == "rpcsend": usage_str = "Usage: %s rpcsend uri protocol_file " % args[0] usage_str += "message_name (-data d | -file f)" if len(args) not in [5, 7]: - print usage_str + print(usage_str) return 1 uri, proto, msg = args[2:5] datum = None @@ -132,15 +137,15 @@ def main(args=sys.argv): reader = open(args[6], 'rb') datum_reader = io.DatumReader() dfr = datafile.DataFileReader(reader, datum_reader) - datum = dfr.next() + datum = next(dfr) elif args[5] == "-data": - print "JSON Decoder not yet implemented." + print("JSON Decoder not yet implemented.") return 1 else: - print usage_str + print(usage_str) return 1 send_message(uri, proto, msg, datum) return 0 - + if __name__ == "__main__": sys.exit(main(sys.argv)) diff --git a/lang/py/src/avro/txipc.py b/lang/py/src/avro/txipc.py index 72d63a6..66ca726 100644 --- a/lang/py/src/avro/txipc.py +++ b/lang/py/src/avro/txipc.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -15,10 +16,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -try: - from cStringIO import StringIO -except ImportError: - from StringIO import StringIO + +from __future__ import absolute_import, division, print_function + from zope.interface import implements from avro import io, ipc @@ -29,6 +29,11 @@ from twisted.web.client import Agent from twisted.web.http_headers import Headers from twisted.web.iweb import IBodyProducer +try: + from cStringIO import StringIO +except ImportError: + from StringIO import StringIO + class TwistedRequestor(ipc.BaseRequestor): """A Twisted-compatible requestor. Returns a Deferred that will fire with the diff --git a/lang/py/src/avro/__init__.py b/lang/py/test/__init__.py similarity index 89% copy from lang/py/src/avro/__init__.py copy to lang/py/test/__init__.py index 47d1295..a2a5bef 100644 --- a/lang/py/src/avro/__init__.py +++ b/lang/py/test/__init__.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -5,14 +8,11 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # https://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 'timezones'] - diff --git a/lang/py/test/av_bench.py b/lang/py/test/av_bench.py index e90c987..1e6a05d 100644 --- a/lang/py/test/av_bench.py +++ b/lang/py/test/av_bench.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -16,6 +17,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import, division, print_function + import sys import time from random import choice, randint, sample @@ -72,5 +75,5 @@ def t(f, *args): if __name__ == "__main__": n = int(sys.argv[1]) - print "Write %0.4f" % t(write, n) - print "Read %0.4f" % t(read) + print("Write %0.4f" % t(write, n)) + print("Read %0.4f" % t(read)) diff --git a/lang/py/test/gen_interop_data.py b/lang/py/test/gen_interop_data.py index 336434e..13bf86c 100644 --- a/lang/py/test/gen_interop_data.py +++ b/lang/py/test/gen_interop_data.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -13,15 +14,33 @@ # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# # See the License for the specific language governing permissions and # limitations under the License. + +from __future__ import absolute_import, division, print_function + +import os import sys from avro import datafile, io, schema +CODECS_TO_VALIDATE = ('null', 'deflate') + +try: + import snappy + CODECS_TO_VALIDATE += ('snappy',) +except ImportError: + print('Snappy not present, will skip generating it.') +try: + import zstandard + CODECS_TO_VALIDATE += ('zstandard',) +except ImportError: + print('Zstandard not present, will skip generating it.') + DATUM = { 'intField': 12, - 'longField': 15234324L, + 'longField': 15234324, 'stringField': unicode('hey'), 'boolField': True, 'floatField': 1234.0, @@ -37,10 +56,15 @@ DATUM = { } if __name__ == "__main__": - interop_schema = schema.parse(open(sys.argv[1], 'r').read()) - writer = open(sys.argv[2], 'wb') - datum_writer = io.DatumWriter() - # NB: not using compression - dfw = datafile.DataFileWriter(writer, datum_writer, interop_schema) - dfw.append(DATUM) - dfw.close() + for codec in CODECS_TO_VALIDATE: + interop_schema = schema.parse(open(sys.argv[1], 'r').read()) + filename = sys.argv[2] + if codec != 'null': + base, ext = os.path.splitext(filename) + filename = base + "_" + codec + ext + writer = open(filename, 'wb') + datum_writer = io.DatumWriter() + # NB: not using compression + dfw = datafile.DataFileWriter(writer, datum_writer, interop_schema, codec=codec) + dfw.append(DATUM) + dfw.close() diff --git a/lang/py/test/mock_tether_parent.py b/lang/py/test/mock_tether_parent.py index c82e249..88d84dd 100644 --- a/lang/py/test/mock_tether_parent.py +++ b/lang/py/test/mock_tether_parent.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -14,45 +17,36 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import, division, print_function + import socket import sys from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer +import avro.tether.tether_task +import avro.tether.util import set_avro_test_path -from avro import ipc, protocol, tether - - -def find_port(): - """ - Return an unbound port - """ - s=socket.socket() - s.bind(("127.0.0.1",0)) - - port=s.getsockname()[1] - s.close() - - return port +from avro import ipc, protocol -SERVER_ADDRESS = ('localhost', find_port()) +SERVER_ADDRESS = ('localhost', avro.tether.util.find_port()) class MockParentResponder(ipc.Responder): """ The responder for the mocked parent """ def __init__(self): - ipc.Responder.__init__(self, tether.outputProtocol) + ipc.Responder.__init__(self, avro.tether.tether_task.outputProtocol) def invoke(self, message, request): if message.name=='configure': - print "MockParentResponder: Recieved 'configure': inputPort={0}".format(request["port"]) + print("MockParentResponder: Recieved 'configure': inputPort={0}".format(request["port"])) elif message.name=='status': - print "MockParentResponder: Recieved 'status': message={0}".format(request["message"]) + print("MockParentResponder: Recieved 'status': message={0}".format(request["message"])) elif message.name=='fail': - print "MockParentResponder: Recieved 'fail': message={0}".format(request["message"]) + print("MockParentResponder: Recieved 'fail': message={0}".format(request["message"])) else: - print "MockParentResponder: Recieved {0}".format(message.name) + print("MockParentResponder: Recieved {0}".format(message.name)) # flush the output so it shows up in the parent process sys.stdout.flush() @@ -85,7 +79,7 @@ if __name__ == '__main__': raise ValueError("Usage: mock_tether_parent start_server port") SERVER_ADDRESS=(SERVER_ADDRESS[0],port) - print "mock_tether_parent: Launching Server on Port: {0}".format(SERVER_ADDRESS[1]) + print("mock_tether_parent: Launching Server on Port: {0}".format(SERVER_ADDRESS[1])) # flush the output so it shows up in the parent process sys.stdout.flush() diff --git a/lang/py/test/sample_http_client.py b/lang/py/test/sample_http_client.py index 62c91fd..02b8421 100644 --- a/lang/py/test/sample_http_client.py +++ b/lang/py/test/sample_http_client.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -13,8 +14,12 @@ # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# # See the License for the specific language governing permissions and # limitations under the License. + +from __future__ import absolute_import, division, print_function + import sys from avro import ipc, protocol @@ -78,7 +83,7 @@ if __name__ == '__main__': # build the parameters for the request params = {} params['message'] = message - + # send the requests and print the result for msg_count in range(num_messages): requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL) diff --git a/lang/py/test/sample_http_server.py b/lang/py/test/sample_http_server.py index e412ab5..c680afb 100644 --- a/lang/py/test/sample_http_server.py +++ b/lang/py/test/sample_http_server.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -16,6 +17,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import, division, print_function + from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer from avro import ipc, protocol diff --git a/lang/py/test/set_avro_test_path.py b/lang/py/test/set_avro_test_path.py index 8e47faf..fd395da 100644 --- a/lang/py/test/set_avro_test_path.py +++ b/lang/py/test/set_avro_test_path.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -13,6 +16,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + """ Module adjusts the path PYTHONPATH so the unittests will work even if an egg for AVRO is already installed. @@ -28,6 +32,9 @@ being built. To work around this the unittests import this module before importing AVRO. This module in turn adjusts the python path so that the test build of AVRO is higher on the path then any installed eggs. """ + +from __future__ import absolute_import, division, print_function + import os import sys diff --git a/lang/py/test/test_datafile.py b/lang/py/test/test_datafile.py index 2b7061c..bceb071 100644 --- a/lang/py/test/test_datafile.py +++ b/lang/py/test/test_datafile.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -5,14 +8,17 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # https://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from __future__ import absolute_import, division, print_function + import os import unittest @@ -56,25 +62,30 @@ try: import snappy CODECS_TO_VALIDATE += ('snappy',) except ImportError: - print 'Snappy not present, will skip testing it.' + print('Snappy not present, will skip testing it.') +try: + import zstandard + CODECS_TO_VALIDATE += ('zstandard',) +except ImportError: + print('Zstandard not present, will skip testing it.') # TODO(hammer): clean up written files with ant, not os.remove class TestDataFile(unittest.TestCase): def test_round_trip(self): - print '' - print 'TEST ROUND TRIP' - print '===============' - print '' + print('') + print('TEST ROUND TRIP') + print('===============') + print('') correct = 0 for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE): for codec in CODECS_TO_VALIDATE: - print '' - print 'SCHEMA NUMBER %d' % (i + 1) - print '================' - print '' - print 'Schema: %s' % example_schema - print 'Datum: %s' % datum - print 'Codec: %s' % codec + print('') + print('SCHEMA NUMBER %d' % (i + 1)) + print('================') + print('') + print('Schema: %s' % example_schema) + print('Datum: %s' % datum) + print('Codec: %s' % codec) # write data in binary to file 10 times writer = open(FILENAME, 'wb') @@ -93,30 +104,30 @@ class TestDataFile(unittest.TestCase): for datum in dfr: round_trip_data.append(datum) - print 'Round Trip Data: %s' % round_trip_data - print 'Round Trip Data Length: %d' % len(round_trip_data) + print('Round Trip Data: %s' % round_trip_data) + print('Round Trip Data Length: %d' % len(round_trip_data)) is_correct = [datum] * 10 == round_trip_data if is_correct: correct += 1 - print 'Correct Round Trip: %s' % is_correct - print '' + print('Correct Round Trip: %s' % is_correct) + print('') os.remove(FILENAME) self.assertEquals(correct, len(CODECS_TO_VALIDATE)*len(SCHEMAS_TO_VALIDATE)) def test_append(self): - print '' - print 'TEST APPEND' - print '===========' - print '' + print('') + print('TEST APPEND') + print('===========') + print('') correct = 0 for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE): for codec in CODECS_TO_VALIDATE: - print '' - print 'SCHEMA NUMBER %d' % (i + 1) - print '================' - print '' - print 'Schema: %s' % example_schema - print 'Datum: %s' % datum - print 'Codec: %s' % codec + print('') + print('SCHEMA NUMBER %d' % (i + 1)) + print('================') + print('') + print('Schema: %s' % example_schema) + print('Datum: %s' % datum) + print('Codec: %s' % codec) # write data in binary to file once writer = open(FILENAME, 'wb') @@ -141,12 +152,12 @@ class TestDataFile(unittest.TestCase): for datum in dfr: appended_data.append(datum) - print 'Appended Data: %s' % appended_data - print 'Appended Data Length: %d' % len(appended_data) + print('Appended Data: %s' % appended_data) + print('Appended Data Length: %d' % len(appended_data)) is_correct = [datum] * 10 == appended_data if is_correct: correct += 1 - print 'Correct Appended: %s' % is_correct - print '' + print('Correct Appended: %s' % is_correct) + print('') os.remove(FILENAME) self.assertEquals(correct, len(CODECS_TO_VALIDATE)*len(SCHEMAS_TO_VALIDATE)) diff --git a/lang/py/test/test_datafile_interop.py b/lang/py/test/test_datafile_interop.py index ee02f99..329b9a1 100644 --- a/lang/py/test/test_datafile_interop.py +++ b/lang/py/test/test_datafile_interop.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -5,14 +8,17 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # https://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from __future__ import absolute_import, division, print_function + import os import unittest @@ -22,20 +28,27 @@ from avro import datafile, io class TestDataFileInterop(unittest.TestCase): def test_interop(self): - print '' - print 'TEST INTEROP' - print '============' - print '' + print() + print('TEST INTEROP') + print('============') + print() for f in os.listdir('@INTEROP_DATA_DIR@'): - print 'READING %s' % f - print '' - - # read data in binary from file - reader = open(os.path.join('@INTEROP_DATA_DIR@', f), 'rb') - datum_reader = io.DatumReader() - dfr = datafile.DataFileReader(reader, datum_reader) - for datum in dfr: - assert datum is not None + base_ext = os.path.splitext(os.path.basename(f))[0].split('_', 1) + if len(base_ext) < 2 or base_ext[1] in datafile.VALID_CODECS: + print('READING %s' % f) + print('') + + # read data in binary from file + reader = open(os.path.join('@INTEROP_DATA_DIR@', f), 'rb') + datum_reader = io.DatumReader() + dfr = datafile.DataFileReader(reader, datum_reader) + i = 0 + for i, datum in enumerate(dfr, 1): + assert datum is not None + assert i > 0 + else: + print('SKIPPING %s due to an unsupported codec' % f) + print('') if __name__ == '__main__': unittest.main() diff --git a/lang/py/test/test_io.py b/lang/py/test/test_io.py index 533aa40..2d734a5 100644 --- a/lang/py/test/test_io.py +++ b/lang/py/test/test_io.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -5,14 +8,17 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # https://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from __future__ import absolute_import, division, print_function + import datetime import unittest from binascii import hexlify @@ -27,7 +33,6 @@ except ImportError: from StringIO import StringIO - SCHEMAS_TO_VALIDATE = ( ('"null"', None), ('"boolean"', True), @@ -112,7 +117,7 @@ DEFAULT_VALUE_EXAMPLES = ( ('"string"', '"foo"', u'foo'), ('"bytes"', '"\u00FF\u00FF"', u'\xff\xff'), ('"int"', '5', 5), - ('"long"', '5', 5L), + ('"long"', '5', 5), ('"float"', '1.1', 1.1), ('"double"', '1.1', 1.1), ('{"type": "fixed", "name": "F", "size": 2}', '"\u00FF\u00FF"', u'\xff\xff'), @@ -148,10 +153,10 @@ def avro_hexlify(reader): return ' '.join(bytes) def print_test_name(test_name): - print '' - print test_name - print '=' * len(test_name) - print '' + print('') + print(test_name) + print('=' * len(test_name)) + print('') def write_datum(datum, writers_schema): writer = StringIO() @@ -170,17 +175,17 @@ def check_binary_encoding(number_type): print_test_name('TEST BINARY %s ENCODING' % number_type.upper()) correct = 0 for datum, hex_encoding in BINARY_ENCODINGS: - print 'Datum: %d' % datum - print 'Correct Encoding: %s' % hex_encoding + print('Datum: %d' % datum) + print('Correct Encoding: %s' % hex_encoding) writers_schema = schema.parse('"%s"' % number_type.lower()) writer, encoder, datum_writer = write_datum(datum, writers_schema) writer.seek(0) hex_val = avro_hexlify(writer) - print 'Read Encoding: %s' % hex_val + print('Read Encoding: %s' % hex_val) if hex_encoding == hex_val: correct += 1 - print '' + print('') return correct def check_skip_number(number_type): @@ -188,7 +193,7 @@ def check_skip_number(number_type): correct = 0 for value_to_skip, hex_encoding in BINARY_ENCODINGS: VALUE_TO_READ = 6253 - print 'Value to Skip: %d' % value_to_skip + print('Value to Skip: %d' % value_to_skip) # write the value to skip and a known value writers_schema = schema.parse('"%s"' % number_type.lower()) @@ -204,11 +209,11 @@ def check_skip_number(number_type): datum_reader = io.DatumReader(writers_schema) read_value = datum_reader.read(decoder) - print 'Read Value: %d' % read_value + print('Read Value: %d' % read_value) if read_value == VALUE_TO_READ: correct += 1 - print '' + print('') return correct - + class TestIO(unittest.TestCase): # # BASIC FUNCTIONALITY @@ -218,10 +223,10 @@ class TestIO(unittest.TestCase): print_test_name('TEST VALIDATE') passed = 0 for example_schema, datum in SCHEMAS_TO_VALIDATE: - print 'Schema: %s' % example_schema - print 'Datum: %s' % datum + print('Schema: %s' % example_schema) + print('Datum: %s' % datum) validated = io.validate(schema.parse(example_schema), datum) - print 'Valid: %s' % validated + print('Valid: %s' % validated) if validated: passed += 1 self.assertEquals(passed, len(SCHEMAS_TO_VALIDATE)) @@ -229,14 +234,14 @@ class TestIO(unittest.TestCase): print_test_name('TEST ROUND TRIP') correct = 0 for example_schema, datum in SCHEMAS_TO_VALIDATE: - print 'Schema: %s' % example_schema - print 'Datum: %s' % datum + print('Schema: %s' % example_schema) + print('Datum: %s' % datum) writers_schema = schema.parse(example_schema) writer, encoder, datum_writer = write_datum(datum, writers_schema) round_trip_datum = read_datum(writer, writers_schema) - print 'Round Trip Datum: %s' % round_trip_datum + print('Round Trip Datum: %s' % round_trip_datum) if isinstance(round_trip_datum, Decimal): round_trip_datum = round_trip_datum.to_eng_string() datum = str(datum) @@ -283,8 +288,8 @@ class TestIO(unittest.TestCase): readers_schema = schema.parse(rs) writer, enc, dw = write_datum(datum_to_write, writers_schema) datum_read = read_datum(writer, writers_schema, readers_schema) - print 'Writer: %s Reader: %s' % (writers_schema, readers_schema) - print 'Datum Read: %s' % datum_read + print('Writer: %s Reader: %s' % (writers_schema, readers_schema)) + print('Datum Read: %s' % datum_read) if datum_read != datum_to_write: incorrect += 1 self.assertEquals(incorrect, 0) @@ -320,7 +325,7 @@ class TestIO(unittest.TestCase): writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema) datum_read = read_datum(writer, writers_schema, readers_schema) - print 'Datum Read: %s' % datum_read + print('Datum Read: %s' % datum_read) if datum_to_read == datum_read: correct += 1 self.assertEquals(correct, len(DEFAULT_VALUE_EXAMPLES)) @@ -352,7 +357,7 @@ class TestIO(unittest.TestCase): writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema) datum_read = read_datum(writer, writers_schema, readers_schema) - print 'Datum Read: %s' % datum_read + print('Datum Read: %s' % datum_read) self.assertEquals(datum_to_read, datum_read) def test_field_order(self): @@ -368,7 +373,7 @@ class TestIO(unittest.TestCase): writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema) datum_read = read_datum(writer, writers_schema, readers_schema) - print 'Datum Read: %s' % datum_read + print('Datum Read: %s' % datum_read) self.assertEquals(datum_to_read, datum_read) def test_type_exception(self): diff --git a/lang/py/test/test_ipc.py b/lang/py/test/test_ipc.py index 575a0c9..bc9bd21 100644 --- a/lang/py/test/test_ipc.py +++ b/lang/py/test/test_ipc.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -13,10 +16,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + """ There are currently no IPC tests within python, in part because there are no servers yet available. """ + +from __future__ import absolute_import, division, print_function + import unittest import set_avro_test_path diff --git a/lang/py/test/test_script.py b/lang/py/test/test_script.py index 214fc15..bd0cb4d 100644 --- a/lang/py/test/test_script.py +++ b/lang/py/test/test_script.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -5,15 +8,17 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # https://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import, division, print_function + import csv import json import unittest @@ -125,7 +130,7 @@ class TestCat(unittest.TestCase): def test_json_pretty(self): out = self._run("--format", "json-pretty", "-n", "1", raw=1) - assert out.strip() == _JSON_PRETTY.strip() + self.assertEqual(out.strip(), _JSON_PRETTY.strip()) def test_version(self): check_output([SCRIPT, "cat", "--version"]) diff --git a/lang/py/test/test_tether_task.py b/lang/py/test/test_tether_task.py index 9933070..85ed9cb 100644 --- a/lang/py/test/test_tether_task.py +++ b/lang/py/test/test_tether_task.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -14,15 +17,22 @@ # See the License for the specific language governing permissions and # limitations under the License. - +from __future__ import absolute_import, division, print_function import os +import StringIO import subprocess import sys import time import unittest +import avro.tether.tether_task +import avro.tether.util +import mock_tether_parent import set_avro_test_path +from avro import io as avio +from avro import schema, tether +from word_count_task import WordCountTask class TestTetherTask(unittest.TestCase): @@ -34,15 +44,6 @@ class TestTetherTask(unittest.TestCase): Test that the thether_task is working. We run the mock_tether_parent in a separate subprocess """ - from avro import tether - from avro import io as avio - from avro import schema - from avro.tether import HTTPRequestor,inputProtocol, find_port - - import StringIO - import mock_tether_parent - from word_count_task import WordCountTask - task=WordCountTask() proc=None @@ -51,13 +52,13 @@ class TestTetherTask(unittest.TestCase): # env["AVRO_TETHER_OUTPUT_PORT"]=output_port env=dict() env["PYTHONPATH"]=':'.join(sys.path) - server_port=find_port() + server_port = avro.tether.util.find_port() pyfile=mock_tether_parent.__file__ proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(server_port)]) - input_port=find_port() + input_port = avro.tether.util.find_port() - print "Mock server started process pid={0}".format(proc.pid) + print("Mock server started process pid={0}".format(proc.pid)) # Possible race condition? open tries to connect to the subprocess before the subprocess is fully started # so we give the subprocess time to start up time.sleep(1) @@ -68,7 +69,11 @@ class TestTetherTask(unittest.TestCase): #*************************************************************** # Test the mapper - task.configure(tether.TaskType.MAP,str(task.inschema),str(task.midschema)) + task.configure( + avro.tether.tether_task.TaskType.MAP, + str(task.inschema), + str(task.midschema) + ) # Serialize some data so we can send it to the input function datum="This is a line of text" @@ -84,7 +89,11 @@ class TestTetherTask(unittest.TestCase): task.input(data,1) # Test the reducer - task.configure(tether.TaskType.REDUCE,str(task.midschema),str(task.outschema)) + task.configure( + avro.tether.tether_task.TaskType.REDUCE, + str(task.midschema), + str(task.outschema) + ) # Serialize some data so we can send it to the input function datum={"key":"word","value":2} diff --git a/lang/py/test/test_tether_task_runner.py b/lang/py/test/test_tether_task_runner.py index 3832dbe..985eb3c 100644 --- a/lang/py/test/test_tether_task_runner.py +++ b/lang/py/test/test_tether_task_runner.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -14,28 +17,29 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import, division, print_function + +import logging import os +import StringIO import subprocess import sys import time import unittest +import avro.tether.tether_task +import avro.tether.tether_task_runner +import avro.tether.util +import mock_tether_parent import set_avro_test_path +from avro import io as avio +from word_count_task import WordCountTask class TestTetherTaskRunner(unittest.TestCase): - """ unit test for a tethered task runner. - """ + """unit test for a tethered task runner.""" def test1(self): - from word_count_task import WordCountTask - from avro.tether import TaskRunner, find_port,HTTPRequestor,inputProtocol, TaskType - from avro import io as avio - import mock_tether_parent - import subprocess - import StringIO - import logging - # set the logging level to debug so that debug messages are printed logging.basicConfig(level=logging.DEBUG) @@ -44,30 +48,34 @@ class TestTetherTaskRunner(unittest.TestCase): # launch the server in a separate process env=dict() env["PYTHONPATH"]=':'.join(sys.path) - parent_port=find_port() + parent_port = avro.tether.util.find_port() pyfile=mock_tether_parent.__file__ proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(parent_port)]) - input_port=find_port() + input_port = avro.tether.util.find_port() - print "Mock server started process pid={0}".format(proc.pid) + print("Mock server started process pid={0}".format(proc.pid)) # Possible race condition? open tries to connect to the subprocess before the subprocess is fully started # so we give the subprocess time to start up time.sleep(1) - runner=TaskRunner(WordCountTask()) + runner = avro.tether.tether_task_runner.TaskRunner(WordCountTask()) runner.start(outputport=parent_port,join=False) - # Test sending various messages to the server and ensuring they are - # processed correctly - requestor=HTTPRequestor("localhost",runner.server.server_address[1],inputProtocol) + # Test sending various messages to the server and ensuring they are processed correctly + requestor = avro.tether.tether_task.HTTPRequestor( + "localhost", runner.server.server_address[1], avro.tether.tether_task.inputProtocol) # TODO: We should validate that open worked by grabbing the STDOUT of the subproces # and ensuring that it outputted the correct message. # Test the mapper - requestor.request("configure",{"taskType":TaskType.MAP,"inSchema":str(runner.task.inschema),"outSchema":str(runner.task.midschema)}) + requestor.request("configure", { + "taskType": avro.tether.tether_task.TaskType.MAP, + "inSchema": str(runner.task.inschema), + "outSchema": str(runner.task.midschema) + }) # Serialize some data so we can send it to the input function datum="This is a line of text" @@ -83,8 +91,12 @@ class TestTetherTaskRunner(unittest.TestCase): # Call input to simulate calling map requestor.request("input",{"data":data,"count":1}) - #Test the reducer - requestor.request("configure",{"taskType":TaskType.REDUCE,"inSchema":str(runner.task.midschema),"outSchema":str(runner.task.outschema)}) + # Test the reducer + requestor.request("configure", { + "taskType": avro.tether.tether_task.TaskType.REDUCE, + "inSchema": str(runner.task.midschema), + "outSchema": str(runner.task.outschema)} + ) #Serialize some data so we can send it to the input function datum={"key":"word","value":2} @@ -133,15 +145,6 @@ class TestTetherTaskRunner(unittest.TestCase): as our main script everything works as expected. We do this by using subprocess to run it in a separate thread. """ - from word_count_task import WordCountTask - from avro.tether import TaskRunner, find_port,HTTPRequestor,inputProtocol, TaskType - from avro.tether import tether_task_runner - from avro import io as avio - import mock_tether_parent - import subprocess - import StringIO - - proc=None runnerproc=None @@ -149,7 +152,7 @@ class TestTetherTaskRunner(unittest.TestCase): #launch the server in a separate process env=dict() env["PYTHONPATH"]=':'.join(sys.path) - parent_port=find_port() + parent_port = avro.tether.util.find_port() pyfile=mock_tether_parent.__file__ proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(parent_port)]) @@ -164,14 +167,14 @@ class TestTetherTaskRunner(unittest.TestCase): env={"AVRO_TETHER_OUTPUT_PORT":"{0}".format(parent_port)} env["PYTHONPATH"]=':'.join(sys.path) - runnerproc=subprocess.Popen(["python",tether_task_runner.__file__,"word_count_task.WordCountTask"],env=env) + runnerproc = subprocess.Popen(["python", avro.tether.tether_task_runner.__file__, "word_count_task.WordCountTask"],env=env) #possible race condition wait for the process to start time.sleep(1) - print "Mock server started process pid={0}".format(proc.pid) + print("Mock server started process pid={0}".format(proc.pid)) #Possible race condition? open tries to connect to the subprocess before the subprocess is fully started #so we give the subprocess time to start up time.sleep(1) diff --git a/lang/py/test/test_tether_word_count.py b/lang/py/test/test_tether_word_count.py index d2f1858..8c3fb08 100644 --- a/lang/py/test/test_tether_word_count.py +++ b/lang/py/test/test_tether_word_count.py @@ -1,3 +1,6 @@ +#!/usr/bin/env python + +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -14,6 +17,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import, division, print_function + import inspect import os import subprocess @@ -25,8 +30,7 @@ import set_avro_test_path class TestTetherWordCount(unittest.TestCase): - """ unittest for a python tethered map-reduce job. - """ + """unittest for a python tethered map-reduce job.""" def _write_lines(self,lines,fname): """ @@ -72,7 +76,7 @@ class TestTetherWordCount(unittest.TestCase): words=line.split() for w in words: - if not(counts.has_key(w.strip())): + if not(w.strip() in counts): counts[w.strip()]=0 counts[w.strip()]=counts[w.strip()]+1 @@ -92,7 +96,6 @@ class TestTetherWordCount(unittest.TestCase): import avro import subprocess - import StringIO import shutil import tempfile import inspect @@ -182,11 +185,11 @@ python -m avro.tether.tether_task_runner word_count_task.WordCountTask exhf.close() # make it world executable - os.chmod(exfile,0755) + os.chmod(exfile,0o755) args.extend(["--program",exfile]) - print "Command:\n\t{0}".format(" ".join(args)) + print("Command:\n\t{0}".format(" ".join(args))) proc=subprocess.Popen(args) diff --git a/lang/py/test/txsample_http_client.py b/lang/py/test/txsample_http_client.py index dba4ade..28c2c28 100644 --- a/lang/py/test/txsample_http_client.py +++ b/lang/py/test/txsample_http_client.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -13,8 +14,12 @@ # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# # See the License for the specific language governing permissions and # limitations under the License. + +from __future__ import absolute_import, division, print_function + import sys from avro import protocol, txipc diff --git a/lang/py/test/txsample_http_server.py b/lang/py/test/txsample_http_server.py index 604ef54..fafaecd 100644 --- a/lang/py/test/txsample_http_server.py +++ b/lang/py/test/txsample_http_server.py @@ -1,5 +1,6 @@ #!/usr/bin/env python +## # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -16,6 +17,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import, division, print_function + from avro import ipc, protocol, txipc from twisted.internet import reactor from twisted.web import server diff --git a/lang/py/test/word_count_task.py b/lang/py/test/word_count_task.py index 24f2da2..8181340 100644 --- a/lang/py/test/word_count_task.py +++ b/lang/py/test/word_count_task.py @@ -1,33 +1,36 @@ -""" - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -""" - -__all__=["WordCountTask"] +#!/usr/bin/env python + +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, division, print_function import logging -from avro.tether import TetherTask +import avro.tether.tether_task + +__all__ = ["WordCountTask"] #TODO::Make the logging level a parameter we can set #logging.basicConfig(level=logging.INFO) -class WordCountTask(TetherTask): +class WordCountTask(avro.tether.tether_task.TetherTask): """ - Implements the mappper and reducer for the word count example + Implements the mapper and reducer for the word count example """ def __init__(self): @@ -40,7 +43,7 @@ class WordCountTask(TetherTask): {"name":"value","type":"long","order":"ignore"}] }""" outschema=midschema - TetherTask.__init__(self,inschema,midschema,outschema) + avro.tether.tether_task.TetherTask.__init__(self, inschema, midschema, outschema) #keep track of the partial sums of the counts
