sunjincheng121 commented on a change in pull request #8420: [FLINK-12408][python] Allow to define the data types in Python URL: https://github.com/apache/flink/pull/8420#discussion_r286745716
########## File path: flink-python/pyflink/table/tests/test_types.py ########## @@ -0,0 +1,747 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import array +import ctypes +import datetime +import pickle +import sys +import unittest + +from pyflink.table.types import (_infer_schema_from_data, _infer_type, + _array_signed_int_typecode_ctype_mappings, + _array_unsigned_int_typecode_ctype_mappings, + _array_type_mappings, _merge_type, + _create_type_verifier, UserDefinedType, DataTypes, Row, RowField, + RowType, ArrayType, BigIntType, VarCharType, MapType) + + +class ExamplePointUDT(UserDefinedType): + """ + User-defined type (UDT) for ExamplePoint. + """ + + @classmethod + def sql_type(cls): + return DataTypes.ARRAY(DataTypes.DOUBLE(False)) + + @classmethod + def module(cls): + return 'pyflink.table.tests.test_types' + + @classmethod + def java_udt(cls): + return 'org.apache.flink.table.types.python.ExamplePointUserDefinedType' + + def serialize(self, obj): + return [obj.x, obj.y] + + def deserialize(self, datum): + return ExamplePoint(datum[0], datum[1]) + + +class ExamplePoint: + """ + An example class to demonstrate UDT in Java, and Python. + """ + + __UDT__ = ExamplePointUDT() + + def __init__(self, x, y): + self.x = x + self.y = y + + def __repr__(self): + return "ExamplePoint(%s,%s)" % (self.x, self.y) + + def __str__(self): + return "(%s,%s)" % (self.x, self.y) + + def __eq__(self, other): + return isinstance(other, self.__class__) and \ + other.x == self.x and other.y == self.y + + +class PythonOnlyUDT(UserDefinedType): + """ + User-defined type (UDT) for ExamplePoint. + """ + + @classmethod + def sql_type(cls): + return DataTypes.ARRAY(DataTypes.DOUBLE(False)) + + @classmethod + def module(cls): + return '__main__' + + def serialize(self, obj): + return [obj.x, obj.y] + + def deserialize(self, datum): + return PythonOnlyPoint(datum[0], datum[1]) + + @staticmethod + def foo(): + pass + + @property + def props(self): + return {} + + +class PythonOnlyPoint(ExamplePoint): + """ + An example class to demonstrate UDT in only Python + """ + __UDT__ = PythonOnlyUDT() + + +class TypesTests(unittest.TestCase): + + def test_infer_schema(self): + from decimal import Decimal + + class A(object): + def __init__(self): + self.a = 1 + + from collections import namedtuple + Point = namedtuple('Point', 'x y') + + data = [ + True, + 1, + "a", + u"a", + datetime.date(1970, 1, 1), + datetime.time(0, 0, 0), + datetime.datetime(1970, 1, 1, 0, 0), + 1.0, + array.array("d", [1]), + [1], + (1, ), + Point(1.0, 5.0), + {"a": 1}, + bytearray(1), + Decimal(1), + Row(a=1), + Row("a")(1), + A(), + ] + + expected = [ + 'BooleanType(true)', + 'BigIntType(true)', + 'VarCharType(2147483647, true)', + 'VarCharType(2147483647, true)', + 'DateType(true)', + 'TimeType(0, true)', + 'TimestampType(0, 6, true)', + 'DoubleType(true)', + "ArrayType(DoubleType(false), true)", + "ArrayType(BigIntType(true), true)", + 'RowType(RowField(_1, BigIntType(true), ...))', + 'RowType(RowField(x, DoubleType(true), ...),RowField(y, DoubleType(true), ...))', + 'MapType(VarCharType(2147483647, false), BigIntType(true), true)', + 'VarBinaryType(2147483647, true)', + 'DecimalType(38, 18, true)', + 'RowType(RowField(a, BigIntType(true), ...))', + 'RowType(RowField(a, BigIntType(true), ...))', + 'RowType(RowField(a, BigIntType(true), ...))', + ] + + schema = _infer_schema_from_data([data]) + self.assertEqual(expected, [str(f.data_type) for f in schema.fields]) + + def test_infer_schema_nulltype(self): + elements = [Row(c1=[], c2={}, c3=None), + Row(c1=[Row(a=1, b='s')], c2={"key": Row(c=1.0, d="2")}, c3="")] + schema = _infer_schema_from_data(elements) + self.assertTrue(isinstance(schema, RowType)) + self.assertEqual(3, len(schema.fields)) + + # first column is array + self.assertTrue(isinstance(schema.fields[0].data_type, ArrayType)) + + # element type of first column is struct + self.assertTrue(isinstance(schema.fields[0].data_type.element_type, RowType)) + + self.assertTrue(isinstance(schema.fields[0].data_type.element_type.fields[0].data_type, + BigIntType)) + self.assertTrue(isinstance(schema.fields[0].data_type.element_type.fields[1].data_type, + VarCharType)) + + # second column is map + self.assertTrue(isinstance(schema.fields[1].data_type, MapType)) + self.assertTrue(isinstance(schema.fields[1].data_type.key_type, VarCharType)) + self.assertTrue(isinstance(schema.fields[1].data_type.value_type, RowType)) + + # third column is varchar + self.assertTrue(isinstance(schema.fields[2].data_type, VarCharType)) + + def test_infer_schema_not_enough_names(self): + schema = _infer_schema_from_data([["a", "b"]], ["col1"]) + self.assertTrue(schema.names, ['col1', '_2']) + + def test_infer_schema_fails(self): + with self.assertRaisesRegexp(TypeError, 'field a'): + _infer_schema_from_data([[1, 1], ["x", 1]], names=["a", "b"]) + + def test_infer_nested_schema(self): + NestedRow = Row("f1", "f2") + data1 = [NestedRow([1, 2], {"row1": 1.0}), NestedRow([2, 3], {"row2": 2.0})] + schema1 = _infer_schema_from_data(data1) + expected1 = [ + 'ArrayType(BigIntType(true), true)', + 'MapType(VarCharType(2147483647, false), DoubleType(true), true)' + ] + self.assertEqual(expected1, [str(f.data_type) for f in schema1.fields]) + + data2 = [NestedRow([[1, 2], [2, 3]], [1, 2]), NestedRow([[2, 3], [3, 4]], [2, 3])] + schema2 = _infer_schema_from_data(data2) + expected2 = [ + 'ArrayType(ArrayType(BigIntType(true), true), true)', + 'ArrayType(BigIntType(true), true)' + ] + self.assertEqual(expected2, [str(f.data_type) for f in schema2.fields]) + + def test_convert_row_to_dict(self): + row = Row(l=[Row(a=1, b='s')], d={"key": Row(c=1.0, d="2")}) + self.assertEqual(1, row.as_dict()['l'][0].a) + self.assertEqual(1.0, row.as_dict()['d']['key'].c) + + def test_udt(self): + p = ExamplePoint(1.0, 2.0) + self.assertEqual(_infer_type(p), ExamplePointUDT()) + _create_type_verifier(ExamplePointUDT())(ExamplePoint(1.0, 2.0)) + self.assertRaises(ValueError, lambda: _create_type_verifier(ExamplePointUDT())([1.0, 2.0])) + + p = PythonOnlyPoint(1.0, 2.0) + self.assertEqual(_infer_type(p), PythonOnlyUDT()) + _create_type_verifier(PythonOnlyUDT())(PythonOnlyPoint(1.0, 2.0)) + self.assertRaises(ValueError, lambda: _create_type_verifier(PythonOnlyUDT())([1.0, 2.0])) + + def test_nested_udt_in_df(self): + expected_schema = DataTypes.ROW()\ + .add("_1", DataTypes.BIGINT()).add("_2", DataTypes.ARRAY(PythonOnlyUDT())) + data = (1, [PythonOnlyPoint(float(1), float(2))]) + self.assertEqual(expected_schema, _infer_type(data)) + + expected_schema = DataTypes.ROW().add("_1", DataTypes.BIGINT()).add( + "_2", DataTypes.MAP(DataTypes.BIGINT(False), PythonOnlyUDT())) + p = (1, {1: PythonOnlyPoint(1, float(2))}) + self.assertEqual(expected_schema, _infer_type(p)) + + def test_struct_type(self): + row1 = DataTypes.ROW().add("f1", DataTypes.VARCHAR(nullable=True))\ + .add("f2", DataTypes.VARCHAR(nullable=True)) + row2 = DataTypes.ROW([DataTypes.FIELD("f1", DataTypes.VARCHAR(nullable=True)), + DataTypes.FIELD("f2", DataTypes.VARCHAR(nullable=True), None)]) + self.assertEqual(row1.field_names(), row2.names) + self.assertEqual(row1, row2) + + row1 = DataTypes.ROW().add("f1", DataTypes.VARCHAR(nullable=True))\ + .add("f2", DataTypes.VARCHAR(nullable=True)) + row2 = DataTypes.ROW([DataTypes.FIELD("f1", DataTypes.VARCHAR(nullable=True))]) + self.assertNotEqual(row1.field_names(), row2.names) + self.assertNotEqual(row1, row2) + + row1 = (DataTypes.ROW().add(DataTypes.FIELD("f1", DataTypes.VARCHAR(nullable=True))) + .add("f2", DataTypes.VARCHAR(nullable=True))) + row2 = DataTypes.ROW([DataTypes.FIELD("f1", DataTypes.VARCHAR(nullable=True)), + DataTypes.FIELD("f2", DataTypes.VARCHAR(nullable=True))]) + self.assertEqual(row1.field_names(), row2.names) + self.assertEqual(row1, row2) + + row1 = (DataTypes.ROW().add(DataTypes.FIELD("f1", DataTypes.VARCHAR(nullable=True))) + .add("f2", DataTypes.VARCHAR(nullable=True))) + row2 = DataTypes.ROW([DataTypes.FIELD("f1", DataTypes.VARCHAR(nullable=True))]) + self.assertNotEqual(row1.field_names(), row2.names) + self.assertNotEqual(row1, row2) + + # Catch exception raised during improper construction + self.assertRaises(ValueError, lambda: DataTypes.ROW().add("name")) + + row1 = DataTypes.ROW().add("f1", DataTypes.VARCHAR(nullable=True))\ + .add("f2", DataTypes.VARCHAR(nullable=True)) + for field in row1: + self.assertIsInstance(field, RowField) + + row1 = DataTypes.ROW().add("f1", DataTypes.VARCHAR(nullable=True))\ + .add("f2", DataTypes.VARCHAR(nullable=True)) + self.assertEqual(len(row1), 2) + + row1 = DataTypes.ROW().add("f1", DataTypes.VARCHAR(nullable=True))\ + .add("f2", DataTypes.VARCHAR(nullable=True)) + self.assertIs(row1["f1"], row1.fields[0]) + self.assertIs(row1[0], row1.fields[0]) + self.assertEqual(row1[0:1], DataTypes.ROW(row1.fields[0:1])) + self.assertRaises(KeyError, lambda: row1["f9"]) + self.assertRaises(IndexError, lambda: row1[9]) + self.assertRaises(TypeError, lambda: row1[9.9]) + + def test_infer_bigint_type(self): + longrow = [Row(f1='a', f2=100000000000000)] + schema = _infer_schema_from_data(longrow) + self.assertEqual(DataTypes.BIGINT(), schema.fields[1].data_type) + self.assertEqual(DataTypes.BIGINT(), _infer_type(1)) + self.assertEqual(DataTypes.BIGINT(), _infer_type(2**10)) + self.assertEqual(DataTypes.BIGINT(), _infer_type(2**20)) + self.assertEqual(DataTypes.BIGINT(), _infer_type(2**31 - 1)) + self.assertEqual(DataTypes.BIGINT(), _infer_type(2**31)) + self.assertEqual(DataTypes.BIGINT(), _infer_type(2**61)) + self.assertEqual(DataTypes.BIGINT(), _infer_type(2**71)) + + def test_merge_type(self): + self.assertEqual(_merge_type(DataTypes.BIGINT(), DataTypes.NULL()), DataTypes.BIGINT()) + self.assertEqual(_merge_type(DataTypes.NULL(), DataTypes.BIGINT()), DataTypes.BIGINT()) + + self.assertEqual(_merge_type(DataTypes.BIGINT(), DataTypes.BIGINT()), DataTypes.BIGINT()) + + self.assertEqual(_merge_type( + DataTypes.ARRAY(DataTypes.BIGINT()), + DataTypes.ARRAY(DataTypes.BIGINT()) + ), DataTypes.ARRAY(DataTypes.BIGINT())) + with self.assertRaisesRegexp(TypeError, 'element in array'): + _merge_type(DataTypes.ARRAY(DataTypes.BIGINT()), DataTypes.ARRAY(DataTypes.DOUBLE())) + + self.assertEqual(_merge_type( + DataTypes.MAP(DataTypes.VARCHAR(), DataTypes.BIGINT()), + DataTypes.MAP(DataTypes.VARCHAR(), DataTypes.BIGINT()) + ), DataTypes.MAP(DataTypes.VARCHAR(), DataTypes.BIGINT())) + with self.assertRaisesRegexp(TypeError, 'key of map'): + _merge_type( + DataTypes.MAP(DataTypes.VARCHAR(), DataTypes.BIGINT()), + DataTypes.MAP(DataTypes.DOUBLE(), DataTypes.BIGINT())) + with self.assertRaisesRegexp(TypeError, 'value of map'): + _merge_type( + DataTypes.MAP(DataTypes.VARCHAR(), DataTypes.BIGINT()), + DataTypes.MAP(DataTypes.VARCHAR(), DataTypes.DOUBLE())) + + self.assertEqual(_merge_type( + DataTypes.ROW([DataTypes.FIELD('f1', DataTypes.BIGINT()), + DataTypes.FIELD('f2', DataTypes.VARCHAR())]), + DataTypes.ROW([DataTypes.FIELD('f1', DataTypes.BIGINT()), + DataTypes.FIELD('f2', DataTypes.VARCHAR())]) + ), DataTypes.ROW([DataTypes.FIELD('f1', DataTypes.BIGINT()), + DataTypes.FIELD('f2', DataTypes.VARCHAR())])) + with self.assertRaisesRegexp(TypeError, 'field f1'): + _merge_type( + DataTypes.ROW([DataTypes.FIELD('f1', DataTypes.BIGINT()), + DataTypes.FIELD('f2', DataTypes.VARCHAR())]), + DataTypes.ROW([DataTypes.FIELD('f1', DataTypes.DOUBLE()), + DataTypes.FIELD('f2', DataTypes.VARCHAR())])) + + self.assertEqual(_merge_type( + DataTypes.ROW([DataTypes.FIELD( + 'f1', DataTypes.ROW([DataTypes.FIELD('f2', DataTypes.BIGINT())]))]), + DataTypes.ROW([DataTypes.FIELD( + 'f1', DataTypes.ROW([DataTypes.FIELD('f2', DataTypes.BIGINT())]))]) + ), DataTypes.ROW([DataTypes.FIELD( + 'f1', DataTypes.ROW([DataTypes.FIELD('f2', DataTypes.BIGINT())]))])) + with self.assertRaisesRegexp(TypeError, 'field f2 in field f1'): + _merge_type( + DataTypes.ROW([DataTypes.FIELD('f1', DataTypes.ROW( + [DataTypes.FIELD('f2', DataTypes.BIGINT())]))]), + DataTypes.ROW([DataTypes.FIELD('f1', DataTypes.ROW( + [DataTypes.FIELD('f2', DataTypes.VARCHAR())]))])) + + self.assertEqual(_merge_type( + DataTypes.ROW([DataTypes.FIELD('f1', DataTypes.ARRAY(DataTypes.BIGINT())), + DataTypes.FIELD('f2', DataTypes.VARCHAR())]), + DataTypes.ROW([DataTypes.FIELD('f1', DataTypes.ARRAY(DataTypes.BIGINT())), + DataTypes.FIELD('f2', DataTypes.VARCHAR())]) + ), DataTypes.ROW([DataTypes.FIELD('f1', DataTypes.ARRAY(DataTypes.BIGINT())), + DataTypes.FIELD('f2', DataTypes.VARCHAR())])) + with self.assertRaisesRegexp(TypeError, 'element in array field f1'): + _merge_type( + DataTypes.ROW([ + DataTypes.FIELD('f1', DataTypes.ARRAY(DataTypes.BIGINT())), + DataTypes.FIELD('f2', DataTypes.VARCHAR())]), + DataTypes.ROW([ + DataTypes.FIELD('f1', DataTypes.ARRAY(DataTypes.DOUBLE())), + DataTypes.FIELD('f2', DataTypes.VARCHAR())])) + + self.assertEqual(_merge_type( + DataTypes.ROW([ + DataTypes.FIELD('f1', DataTypes.MAP(DataTypes.VARCHAR(), DataTypes.BIGINT())), + DataTypes.FIELD('f2', DataTypes.VARCHAR())]), + DataTypes.ROW([ + DataTypes.FIELD('f1', DataTypes.MAP(DataTypes.VARCHAR(), DataTypes.BIGINT())), + DataTypes.FIELD('f2', DataTypes.VARCHAR())]) + ), DataTypes.ROW([ + DataTypes.FIELD('f1', DataTypes.MAP(DataTypes.VARCHAR(), DataTypes.BIGINT())), + DataTypes.FIELD('f2', DataTypes.VARCHAR())])) + with self.assertRaisesRegexp(TypeError, 'value of map field f1'): + _merge_type( + DataTypes.ROW([ + DataTypes.FIELD('f1', DataTypes.MAP(DataTypes.VARCHAR(), DataTypes.BIGINT())), + DataTypes.FIELD('f2', DataTypes.VARCHAR())]), + DataTypes.ROW([ + DataTypes.FIELD('f1', DataTypes.MAP(DataTypes.VARCHAR(), DataTypes.DOUBLE())), + DataTypes.FIELD('f2', DataTypes.VARCHAR())])) + + self.assertEqual(_merge_type( + DataTypes.ROW([DataTypes.FIELD('f1', DataTypes.ARRAY( + DataTypes.MAP(DataTypes.VARCHAR(), DataTypes.BIGINT())))]), + DataTypes.ROW([DataTypes.FIELD('f1', DataTypes.ARRAY( + DataTypes.MAP(DataTypes.VARCHAR(), DataTypes.BIGINT())))]) + ), DataTypes.ROW([DataTypes.FIELD('f1', DataTypes.ARRAY( + DataTypes.MAP(DataTypes.VARCHAR(), DataTypes.BIGINT())))])) + with self.assertRaisesRegexp(TypeError, 'key of map element in array field f1'): + _merge_type( + DataTypes.ROW([DataTypes.FIELD('f1', DataTypes.ARRAY( + DataTypes.MAP(DataTypes.VARCHAR(), DataTypes.BIGINT())))]), + DataTypes.ROW([DataTypes.FIELD('f1', DataTypes.ARRAY( + DataTypes.MAP(DataTypes.DOUBLE(), DataTypes.BIGINT())))]) + ) + + def test_array_types(self): + # This test need to make sure that the Scala type selected is at least + # as large as the python's types. This is necessary because python's + # array types depend on C implementation on the machine. Therefore there + # is no machine independent correspondence between python's array types + # and Scala types. + # See: https://docs.python.org/2/library/array.html + + def assert_collect_success(typecode, value, element_type): + self.assertEqual(element_type, + str(_infer_type(array.array(typecode, [value])).element_type)) + + # supported string types + # + # String types in python's array are "u" for Py_UNICODE and "c" for char. + # "u" will be removed in python 4, and "c" is not supported in python 3. + supported_string_types = [] + if sys.version_info[0] < 4: + supported_string_types += ['u'] + # test unicode + assert_collect_success('u', u'a', 'CharType(4, false)') + if sys.version_info[0] < 3: + supported_string_types += ['c'] + # test string + assert_collect_success('c', 'a', 'CharType(1, false)') + + # supported float and double + # + # Test max, min, and precision for float and double, assuming IEEE 754 + # floating-point format. + supported_fractional_types = ['f', 'd'] + assert_collect_success('f', ctypes.c_float(1e+38).value, 'FloatType(false)') + assert_collect_success('f', ctypes.c_float(1e-38).value, 'FloatType(false)') + assert_collect_success('f', ctypes.c_float(1.123456).value, 'FloatType(false)') + assert_collect_success('d', sys.float_info.max, 'DoubleType(false)') + assert_collect_success('d', sys.float_info.min, 'DoubleType(false)') + assert_collect_success('d', sys.float_info.epsilon, 'DoubleType(false)') + + def get_int_data_type(size): + if size <= 8: + return "TinyIntType(false)" + if size <= 16: + return "SmallIntType(false)" + if size <= 32: + return "IntType(false)" + if size <= 64: + return "BigIntType(false)" + + # supported signed int types + # + # The size of C types changes with implementation, we need to make sure + # that there is no overflow error on the platform running this test. + supported_signed_int_types = list( + set(_array_signed_int_typecode_ctype_mappings.keys()) + .intersection(set(_array_type_mappings.keys()))) + for t in supported_signed_int_types: + ctype = _array_signed_int_typecode_ctype_mappings[t] + max_val = 2 ** (ctypes.sizeof(ctype) * 8 - 1) + assert_collect_success(t, max_val - 1, get_int_data_type(ctypes.sizeof(ctype) * 8)) + assert_collect_success(t, -max_val, get_int_data_type(ctypes.sizeof(ctype) * 8)) + + # supported unsigned int types + # + # JVM does not have unsigned types. We need to be very careful to make + # sure that there is no overflow error. + supported_unsigned_int_types = list( + set(_array_unsigned_int_typecode_ctype_mappings.keys()) + .intersection(set(_array_type_mappings.keys()))) + for t in supported_unsigned_int_types: + ctype = _array_unsigned_int_typecode_ctype_mappings[t] + max_val = 2 ** (ctypes.sizeof(ctype) * 8 - 1) + assert_collect_success(t, max_val, get_int_data_type(ctypes.sizeof(ctype) * 8 + 1)) + + # all supported types + # + # Make sure the types tested above: + # 1. are all supported types + # 2. cover all supported types + supported_types = (supported_string_types + + supported_fractional_types + + supported_signed_int_types + + supported_unsigned_int_types) + self.assertEqual(set(supported_types), set(_array_type_mappings.keys())) + + # all unsupported types + # + # Keys in _array_type_mappings is a complete list of all supported types, + # and types not in _array_type_mappings are considered unsupported. + # `array.typecodes` are not supported in python 2. + if sys.version_info[0] < 3: + all_types = set(['c', 'b', 'B', 'u', 'h', 'H', 'i', 'I', 'l', 'L', 'f', 'd']) + else: + all_types = set(array.typecodes) + unsupported_types = all_types - set(supported_types) + # test unsupported types + for t in unsupported_types: + with self.assertRaises(TypeError): + _infer_schema_from_data([Row(myarray=array.array(t))]) + + def test_data_type_eq(self): + lt = DataTypes.BIGINT() + lt2 = pickle.loads(pickle.dumps(DataTypes.BIGINT())) + self.assertEqual(lt, lt2) + + def test_decimal_type(self): + t1 = DataTypes.DECIMAL() + t2 = DataTypes.DECIMAL(10, 2) + self.assertTrue(t2 is not t1) + self.assertNotEqual(t1, t2) + t3 = DataTypes.DECIMAL(8) + self.assertNotEqual(t2, t3) + + def test_datetype_equal_zero(self): + dt = DataTypes.DATE() + self.assertEqual(dt.from_sql_type(0), datetime.date(1970, 1, 1)) + + def test_timestamp_microsecond(self): + tst = DataTypes.TIMESTAMP() + self.assertEqual(tst.to_sql_type(datetime.datetime.max) % 1000000, 999999) + + def test_empty_row(self): + row = Row() + self.assertEqual(len(row), 0) + + def test_invalid_create_row(self): + row_class = Row("c1", "c2") + self.assertRaises(ValueError, lambda: row_class(1, 2, 3)) + + +class DataTypeVerificationTests(unittest.TestCase): + + def test_verify_type_exception_msg(self): + self.assertRaisesRegexp( + ValueError, + "test_name", + lambda: _create_type_verifier( + DataTypes.VARCHAR(nullable=False), name="test_name")(None)) + + schema = DataTypes.ROW( + [DataTypes.FIELD('a', DataTypes.ROW([DataTypes.FIELD('b', DataTypes.INT())]))]) + self.assertRaisesRegexp( + TypeError, + "field b in field a", + lambda: _create_type_verifier(schema)([["data"]])) + + def test_verify_type_ok_nullable(self): + obj = None + types = [DataTypes.INT(), DataTypes.FLOAT(), DataTypes.VARCHAR(), DataTypes.ROW([])] + for data_type in types: + try: + _create_type_verifier(data_type)(obj) + except Exception: Review comment: except (TypeError, ValueError) ? and other place as same comment. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
