sunjincheng121 commented on a change in pull request #8420: 
[FLINK-12408][python] Allow to define the data types in Python
URL: https://github.com/apache/flink/pull/8420#discussion_r286788480
 
 

 ##########
 File path: flink-python/pyflink/table/types.py
 ##########
 @@ -16,171 +16,1811 @@
 # limitations under the License.
 
################################################################################
 
+import calendar
+import ctypes
+import datetime
+import decimal
 import sys
+import time
+from array import array
+from copy import copy
+from functools import reduce
+from threading import RLock
 
-if sys.version > '3':
-    xrange = range
+from pyflink.util.utils import to_jarray
+from pyflink.java_gateway import get_gateway
 
-__all__ = ['DataTypes']
+if sys.version >= '3':
+    long = int
+    basestring = unicode = str
+
+__all__ = ['DataTypes', 'UserDefinedType', 'Row']
 
 
 class DataType(object):
     """
     Base class for data types.
+
+    :param nullable: boolean, whether the type can be null (None) or not.
     """
-    @classmethod
-    def type_name(cls):
-        return cls.__name__[:-4].lower()
+
+    def __init__(self, nullable=True):
+        self.nullable = nullable
+        self.conversion_cls = ''
+
+    def __repr__(self):
+        return '%s(%s)' % (self.__class__.__name__, str(self.nullable).lower())
 
     def __hash__(self):
-        return hash(self.type_name())
+        return hash(str(self))
 
     def __eq__(self, other):
-        return self.type_name() == other.type_name()
+        return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
 
     def __ne__(self, other):
-        return self.type_name() != other.type_name()
+        return not self.__eq__(other)
 
+    def not_null(self):
+        cp = copy(self)
+        cp.nullable = False
+        return cp
 
-class DataTypeSingleton(type):
-    """
-    Metaclass for DataType
-    """
+    def nullable(self):
+        cp = copy(self)
+        cp.nullable = True
+        return cp
+
+    @classmethod
+    def type_name(cls):
+        return cls.__name__[:-4].upper()
+
+    def bridged_to(self, conversion_cls):
+        """
+        Adds a hint that data should be represented using the given class when 
entering or leaving
+        the table ecosystem.
+
+        :param conversion_cls: the string representation of the conversion 
class
+        """
+        self.conversion_cls = conversion_cls
 
-    _instances = {}
+    def need_conversion(self):
+        """
+        Does this type need to conversion between Python object and internal 
SQL object.
 
-    def __call__(cls):
-        if cls not in cls._instances:
-            cls._instances[cls] = super(DataTypeSingleton, cls).__call__()
-        return cls._instances[cls]
+        This is used to avoid the unnecessary conversion for 
ArrayType/MultisetType/MapType/RowType.
+        """
+        return False
+
+    def to_sql_type(self, obj):
+        """
+        Converts a Python object into an internal SQL object.
+        """
+        return obj
+
+    def from_sql_type(self, obj):
+        """
+        Converts an internal SQL object into a native Python object.
+        """
+        return obj
 
 
 class AtomicType(DataType):
     """
     An internal type used to represent everything that is not
-    null, arrays, structs, and maps.
+    arrays, rows, and maps.
+    """
+
+    def __init__(self, nullable=True):
+        super(AtomicType, self).__init__(nullable)
+
+
+class NullType(AtomicType):
     """
+    Null type.
+
+    The data type representing None.
+    """
+
+    def __init__(self):
+        super(NullType, self).__init__(True)
 
 
 class NumericType(AtomicType):
     """
     Numeric data types.
     """
 
+    def __init__(self, nullable=True):
+        super(NumericType, self).__init__(nullable)
+
 
 class IntegralType(NumericType):
     """
     Integral data types.
     """
 
-    __metaclass__ = DataTypeSingleton
+    def __init__(self, nullable=True):
+        super(IntegralType, self).__init__(nullable)
 
 
 class FractionalType(NumericType):
     """
     Fractional data types.
     """
 
+    def __init__(self, nullable=True):
+        super(FractionalType, self).__init__(nullable)
+
+
+class CharType(AtomicType):
+    """
+    Char data type. SQL CHAR(n)
+
+    The serialized string representation is 'char(n)' where 'n' (default: 1) 
is the number of
+    bytes. 'n' must have a value between 1 and 255 (both inclusive).
 
-class StringType(AtomicType):
+    :param length: int, the string representation length.
+    :param nullable: boolean, whether the type can be null (None) or not.
     """
-    String data type.  SQL VARCHAR
+
+    def __init__(self, length=1, nullable=True):
+        super(CharType, self).__init__(nullable)
+        self.length = length
+
+    def __repr__(self):
+        return 'CharType(%d, %s)' % (self.length, str(self.nullable).lower())
+
+
+class VarCharType(AtomicType):
     """
+    Varchar data type. SQL VARCHAR(n)
 
-    __metaclass__ = DataTypeSingleton
+    The serialized string representation is 'varchar(n)' where 'n' (default: 
1) is the number of
+    characters. 'n' must have a value between 1 and 0x7fffffff (both 
inclusive).
 
+    :param length: int, the maximum string representation length.
+    :param nullable: boolean, whether the type can be null (None) or not.
+    """
 
-class BooleanType(AtomicType):
+    def __init__(self, length=1, nullable=True):
+        super(VarCharType, self).__init__(nullable)
+        self.length = length
+
+    def __repr__(self):
+        return "VarCharType(%d, %s)" % (self.length, 
str(self.nullable).lower())
+
+
+class BinaryType(AtomicType):
     """
-    Boolean data types. SQL BOOLEAN
+    Binary (byte array) data type. SQL BINARY(n)
+
+    The serialized string representation is 'binary(n)' where 'n' (default: 1) 
is the number of
+    bytes. 'n' must have a value between 1 and 0x7fffffff (both inclusive).
+
+    :param length: int, the number of bytes.
+    :param nullable: boolean, whether the type can be null (None) or not.
+    """
+
+    def __init__(self, length=1, nullable=True):
+        super(BinaryType, self).__init__(nullable)
+        self.length = length
+
+    def __repr__(self):
+        return "BinaryType(%d, %s)" % (self.length, str(self.nullable).lower())
+
+
+class VarBinaryType(AtomicType):
     """
+    Binary (byte array) data type. SQL VARBINARY(n)
 
-    __metaclass__ = DataTypeSingleton
+    The serialized string representation is 'varbinary(n)' where 'n' (default: 
1) is the
+    maximum number of bytes. 'n' must have a value between 1 and 0x7fffffff 
(both inclusive).
 
+    :param length: int, the maximum number of bytes.
+    :param nullable: boolean, whether the type can be null (None) or not.
+    """
+
+    def __init__(self, length=1, nullable=True):
+        super(VarBinaryType, self).__init__(nullable)
+        self.length = length
+
+    def __repr__(self):
+        return "VarBinaryType(%d, %s)" % (self.length, 
str(self.nullable).lower())
 
-class ByteType(IntegralType):
+
+class BooleanType(AtomicType):
     """
-    Byte data type. SQL TINYINT
+    Boolean data types. SQL BOOLEAN
+
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
+    def __init__(self, nullable=True):
+        super(BooleanType, self).__init__(nullable)
+
 
-class CharType(IntegralType):
+class TinyIntType(IntegralType):
     """
-    Char data type. SQL CHAR
+    Byte data type. SQL TINYINT (8bits)
+
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
+    def __init__(self, nullable=True):
+        super(TinyIntType, self).__init__(nullable)
+
 
-class ShortType(IntegralType):
+class SmallIntType(IntegralType):
     """
-    Short data types.  SQL SMALLINT (16bits)
+    Short data type. SQL SMALLINT (16bits)
+
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
+    def __init__(self, nullable=True):
+        super(SmallIntType, self).__init__(nullable)
+
 
-class IntegerType(IntegralType):
+class IntType(IntegralType):
     """
     Int data types. SQL INT (32bits)
+
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
+    def __init__(self, nullable=True):
+        super(IntType, self).__init__(nullable)
 
-class LongType(IntegralType):
+
+class BigIntType(IntegralType):
     """
     Long data types. SQL BIGINT (64bits)
+
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
+    def __init__(self, nullable=True):
+        super(BigIntType, self).__init__(nullable)
+
 
 class FloatType(FractionalType):
     """
     Float data type. SQL FLOAT
+
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
-    __metaclass__ = DataTypeSingleton
+    def __init__(self, nullable=True):
+        super(FloatType, self).__init__(nullable)
 
 
 class DoubleType(FractionalType):
     """
     Double data type. SQL DOUBLE
+
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
-    __metaclass__ = DataTypeSingleton
+    def __init__(self, nullable=True):
+        super(DoubleType, self).__init__(nullable)
+
+
+class DecimalType(FractionalType):
+    """
+    Decimal (decimal.Decimal) data type.
+
+    The DecimalType must have fixed precision (the maximum total number of 
digits)
+    and scale (the number of digits on the right of dot). For example, (5, 2) 
can
+    support the value from [-999.99 to 999.99].
+
+    The precision can be up to 38, the scale must be less or equal to 
precision.
+
+    When create a DecimalType, the default precision and scale is (10, 0). 
When infer
+    schema from decimal.Decimal objects, it will be DecimalType(38, 18).
+
+    :param precision: the maximum total number of digits (default: 10)
+    :param scale: the number of digits on right side of dot. (default: 0)
+    :param nullable: boolean, whether the field can be null (None) or not.
+    """
+
+    def __init__(self, precision=10, scale=0, nullable=True):
+        super(DecimalType, self).__init__(nullable)
+        assert 1 <= precision <= 38
+        assert 0 <= scale <= precision
+        self.precision = precision
+        self.scale = scale
+        self.has_precision_info = True  # this is public API
+
+    def __repr__(self):
+        return "DecimalType(%d, %d, %s)" % (self.precision, self.scale, 
str(self.nullable).lower())
 
 
 class DateType(AtomicType):
     """
     Date data type.  SQL DATE
+
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
-    __metaclass__ = DataTypeSingleton
+    def __init__(self, nullable=True):
+        super(DateType, self).__init__(nullable)
+
+    EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal()
+
+    def need_conversion(self):
+        return True
+
+    def to_sql_type(self, d):
+        if d is not None:
+            return d.toordinal() - self.EPOCH_ORDINAL
+
+    def from_sql_type(self, v):
+        if v is not None:
+            return datetime.date.fromordinal(v + self.EPOCH_ORDINAL)
 
 
 class TimeType(AtomicType):
     """
     Time data type. SQL TIME
+
+    The precision must be greater than or equal to 0 and less than or equal to 
9.
+
+    :param precision: int, the number of digits of fractional seconds 
(default: 0)
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
-    __metaclass__ = DataTypeSingleton
+    EPOCH_ORDINAL = calendar.timegm(time.localtime(0)) * 10**6
+
+    def __init__(self, precision=0, nullable=True):
+        super(TimeType, self).__init__(nullable)
+        assert 0 <= precision <= 9
+        self.precision = precision
+
+    def __repr__(self):
+        return "TimeType(%s, %s)" % (self.precision, 
str(self.nullable).lower())
+
+    def need_conversion(self):
+        return True
+
+    def to_sql_type(self, t):
+        if t.tzinfo is not None:
+            offset = t.utcoffset()
+            offset = offset if offset else datetime.timedelta()
+            offset_microseconds =\
+                (offset.days * 86400 + offset.seconds) * 10 ** 6 + 
offset.microseconds
+        else:
+            offset_microseconds = self.EPOCH_ORDINAL
+        minutes = t.hour * 60 + t.minute
+        seconds = minutes * 60 + t.second
+        return seconds * 10**6 + t.microsecond - offset_microseconds
+
+    def from_sql_type(self, t):
+        if t is not None:
+            seconds, microseconds = divmod(t, 10**6)
+            minutes, seconds = divmod(seconds, 60)
+            hours, minutes = divmod(minutes, 60)
+            return datetime.time(hours, minutes, seconds, microseconds)
+
+
+class TimestampKind(object):
+    """
+    Timestamp kind for the time attribute metadata to timestamps.
+    """
+    REGULAR = 0
+    ROWTIME = 1
+    PROCTIME = 2
 
 
 class TimestampType(AtomicType):
     """
     Timestamp data type.  SQL TIMESTAMP
+
+    The precision must be greater than or equal to 0 and less than or equal to 
9.
+
+    :param kind, the time attribute metadata (default: TimestampKind.REGULAR)
+    :param precision: int, the number of digits of fractional seconds 
(default: 6)
+    :param nullable: boolean, whether the field can be null (None) or not.
     """
 
-    __metaclass__ = DataTypeSingleton
+    def __init__(self, kind=TimestampKind.REGULAR, precision=6, nullable=True):
+        super(TimestampType, self).__init__(nullable)
+        assert 0 <= kind <= 2
+        assert 0 <= precision <= 9
+        self.kind = kind
+        self.precision = precision
 
+    def __repr__(self):
+        return "TimestampType(%s, %s, %s)" % (
+            self.kind, self.precision, str(self.nullable).lower())
 
-class DataTypes(object):
+    def need_conversion(self):
+        return True
+
+    def to_sql_type(self, dt):
+        if dt is not None:
+            seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
+                       else time.mktime(dt.timetuple()))
+            return int(seconds) * 10**6 + dt.microsecond
+
+    def from_sql_type(self, ts):
+        if ts is not None:
+            # using int to avoid precision loss in float
+            return datetime.datetime.fromtimestamp(ts // 
10**6).replace(microsecond=ts % 10**6)
+
+
+class ArrayType(DataType):
+    """
+    Array data type.
+
+    :param element_type: :class:`DataType` of each element in the array.
+    :param nullable: boolean, whether the field can be null (None) or not.
+    """
+
+    def __init__(self, element_type, nullable=True):
+        """
+        >>> ArrayType(VarCharType()) == ArrayType(VarCharType())
+        True
+        >>> ArrayType(VarCharType()) == ArrayType(BigIntType())
+        False
+        """
+        assert isinstance(element_type, DataType), \
+            "element_type %s should be an instance of %s" % (element_type, 
DataType)
+        super(ArrayType, self).__init__(nullable)
+        self.element_type = element_type
+
+    def __repr__(self):
+        return "ArrayType(%s, %s)" % (self.element_type, 
str(self.nullable).lower())
+
+    @classmethod
+    def from_json(cls, json):
+        return ArrayType(_parse_datatype_json_value(json["element_type"]), 
json["nullable"])
+
+    def need_conversion(self):
+        return self.element_type.need_conversion()
+
+    def to_sql_type(self, obj):
+        if not self.need_conversion():
+            return obj
+        return obj and [self.element_type.to_sql_type(v) for v in obj]
+
+    def from_sql_type(self, obj):
+        if not self.need_conversion():
+            return obj
+        return obj and [self.element_type.to_sql_type(v) for v in obj]
+
+
+class MapType(DataType):
+    """
+    Map data type.
+
+    :param key_type: :class:`DataType` of the keys in the map.
+    :param value_type: :class:`DataType` of the values in the map.
+    :param nullable: boolean, whether the field can be null (None) or not.
+
+    Keys in a map data type are not allowed to be null (None).
+    """
+
+    def __init__(self, key_type, value_type, nullable=True):
+        """
+        >>> (MapType(VarCharType(nullable=False), IntType())
+        ...        == MapType(VarCharType(nullable=False), IntType()))
+        True
+        >>> (MapType(VarCharType(nullable=False), IntType())
+        ...        == MapType(VarCharType(nullable=False), FloatType()))
+        False
+        """
+        assert isinstance(key_type, DataType), \
+            "key_type %s should be an instance of %s" % (key_type, DataType)
+        assert isinstance(value_type, DataType), \
+            "value_type %s should be an instance of %s" % (value_type, 
DataType)
+        super(MapType, self).__init__(nullable)
+        self.key_type = key_type
+        self.value_type = value_type
+
+    def __repr__(self):
+        return "MapType(%s, %s, %s)" % (self.key_type, self.value_type, 
str(self.nullable).lower())
+
+    @classmethod
+    def from_json(cls, json):
+        return MapType(_parse_datatype_json_value(json["key_type"]),
+                       _parse_datatype_json_value(json["value_type"]),
+                       json["nullable"])
+
+    def need_conversion(self):
+        return self.key_type.need_conversion() or 
self.value_type.need_conversion()
+
+    def to_sql_type(self, obj):
+        if not self.need_conversion():
+            return obj
+        return obj and dict((self.key_type.to_sql_type(k), 
self.value_type.to_sql_type(v))
+                            for k, v in obj.items())
+
+    def from_sql_type(self, obj):
+        if not self.need_conversion():
+            return obj
+        return obj and dict((self.key_type.from_sql_type(k), 
self.value_type.from_sql_type(v))
+                            for k, v in obj.items())
+
+
+class MultisetType(DataType):
+    """
+    MultisetType data type.
+
+    :param element_type: :class:`DataType` of each element in the multiset.
+    :param nullable: boolean, whether the field can be null (None) or not.
+    """
+
+    def __init__(self, element_type, nullable=True):
+        """
+        >>> MultisetType(VarCharType()) == MultisetType(VarCharType())
+        True
+        >>> MultisetType(VarCharType()) == MultisetType(BigIntType())
+        False
+        """
+        assert isinstance(element_type, DataType), \
+            "element_type %s should be an instance of %s" % (element_type, 
DataType)
+        super(MultisetType, self).__init__(nullable)
+        self.element_type = element_type
+
+    def __repr__(self):
+        return "MultisetType(%s, %s)" % (self.element_type, 
str(self.nullable).lower())
+
+    @classmethod
+    def from_json(cls, json):
+        return MultisetType(_parse_datatype_json_value(json["element_type"]), 
json["nullable"])
+
+    def need_conversion(self):
+        return self.element_type.need_conversion()
+
+    def to_sql_type(self, obj):
+        if not self.need_conversion():
+            return obj
+        return obj and [self.element_type.to_sql_type(v) for v in obj]
+
+    def from_sql_type(self, obj):
+        if not self.need_conversion():
+            return obj
+        return obj and [self.element_type.to_sql_type(v) for v in obj]
+
+
+class RowField(object):
+    """
+    A field in :class:`RowType`.
+
+    :param name: string, name of the field.
+    :param data_type: :class:`DataType` of the field.
+    :param description: string, description of the field.
+    """
+
+    def __init__(self, name, data_type, description=None):
+        """
+        >>> (RowField("f1", VarCharType())
+        ...      == RowField("f1", VarCharType()))
+        True
+        >>> (RowField("f1", VarCharType())
+        ...      == RowField("f2", VarCharType()))
+        False
+        """
+        assert isinstance(data_type, DataType), \
+            "data_type %s should be an instance of %s" % (data_type, DataType)
+        assert isinstance(name, basestring), "field name %s should be string" 
% name
+        if not isinstance(name, str):
+            name = name.encode('utf-8')
+        if description is not None:
+            assert isinstance(description, basestring), \
+                "description %s should be string" % description
+            if not isinstance(description, str):
+                description = description.encode('utf-8')
+        self.name = name
+        self.data_type = data_type
+        self.description = '...' if description is None else description
+
+    def __repr__(self):
+        return "RowField(%s, %s, %s)" % (self.name, self.data_type, 
self.description)
+
+    def __eq__(self, other):
+        return isinstance(other, self.__class__) and self.__dict__ == 
other.__dict__
+
+    @classmethod
+    def from_json(cls, json):
+        return RowField(json["name"],
+                        _parse_datatype_json_value(json["type"]),
+                        json["description"])
+
+    def need_conversion(self):
+        return self.data_type.need_conversion()
+
+    def to_sql_type(self, obj):
+        return self.data_type.to_sql_type(obj)
+
+    def from_sql_type(self, obj):
+        return self.data_type.from_sql_type(obj)
+
+
+class RowType(DataType):
+    """
+    Row type, consisting of a list of :class:`RowField`.
+
+    This is the data type representing a :class:`Row`.
+
+    Iterating a :class:`RowType` will iterate its :class:`RowField`\\s.
+    A contained :class:`RowField` can be accessed by name or position.
+
+    >>> row1 = RowType([RowField("f1", VarCharType())])
+    >>> row1["f1"]
+    RowField(f1, VarCharType(1))
+    >>> row1[0]
+    RowField(f1, VarCharType(1))
+    """
+
+    def __init__(self, fields=None, nullable=True):
+        """
+        >>> row1 = RowType([RowField("f1", VarCharType())])
+        >>> row2 = RowType([RowField("f1", VarCharType())])
+        >>> row1 == row2
+        True
+        >>> row1 = RowType([RowField("f1", VarCharType())])
+        >>> row2 = RowType([RowField("f1", VarCharType()),
+        ...     RowField("f2", IntType())])
+        >>> row1 == row2
+        False
+        """
+        super(RowType, self).__init__(nullable)
+        if not fields:
+            self.fields = []
+            self.names = []
+        else:
+            self.fields = fields
+            self.names = [f.name for f in fields]
+            assert all(isinstance(f, RowField) for f in fields), \
+                "fields should be a list of RowField"
+        # Precalculated list of fields that need conversion with
+        # from_sql_type/to_sql_type functions
+        self._need_conversion = [f.need_conversion() for f in self]
+        self._need_serialize_any_field = any(self._need_conversion)
+
+    def add(self, field, data_type=None):
+        """
+        Constructs a RowType by adding new elements to it to define the 
schema. The method accepts
+        either:
+
+            a) A single parameter which is a RowField object.
+            b) 2 parameters as (name, data_type). The data_type parameter may 
be either a String
+               or a DataType object.
+
+        >>> row1 = RowType().add("f1", VarCharType()).add("f2", VarCharType())
+        >>> row2 = RowType([RowField("f1", VarCharType()), RowField("f2", 
VarCharType())])
+        >>> row1 == row2
+        True
+        >>> row1 = RowType().add(RowField("f1", VarCharType()))
+        >>> row2 = RowType([RowField("f1", VarCharType())])
+        >>> row1 == row2
+        True
+        >>> row1 = RowType().add("f1", "string")
+        >>> row2 = RowType([RowField("f1", VarCharType())])
+        >>> row1 == row2
+        True
+
+        :param field: Either the name of the field or a RowField object
+        :param data_type: If present, the DataType of the RowField to create
+        :return: a new updated RowType
+        """
+        if isinstance(field, RowField):
+            self.fields.append(field)
+            self.names.append(field.name)
+        else:
+            if isinstance(field, str) and data_type is None:
+                raise ValueError("Must specify DataType if passing name of 
row_field to create.")
+
+            if isinstance(data_type, str):
+                data_type_f = _parse_datatype_json_value(data_type)
+            else:
+                data_type_f = data_type
+            self.fields.append(RowField(field, data_type_f))
+            self.names.append(field)
+        # Precalculated list of fields that need conversion with
+        # from_sql_type/to_sql_type functions
+        self._need_conversion = [f.need_conversion() for f in self]
+        self._need_serialize_any_field = any(self._need_conversion)
+        return self
+
+    def __iter__(self):
+        """
+        Iterate the fields.
+        """
+        return iter(self.fields)
+
+    def __len__(self):
+        """
+        Returns the number of fields.
+        """
+        return len(self.fields)
+
+    def __getitem__(self, key):
+        """
+        Accesses fields by name or slice.
+        """
+        if isinstance(key, str):
+            for field in self:
+                if field.name == key:
+                    return field
+            raise KeyError('No RowField named {0}'.format(key))
+        elif isinstance(key, int):
+            try:
+                return self.fields[key]
+            except IndexError:
+                raise IndexError('RowType index out of range')
+        elif isinstance(key, slice):
+            return RowType(self.fields[key])
+        else:
+            raise TypeError('RowType keys should be strings, integers or 
slices')
+
+    def __repr__(self):
+        return "RowType(%s)" % ",".join(str(field) for field in self)
+
+    @classmethod
+    def from_json(cls, json):
+        return RowType([RowField.from_json(f) for f in json["fields"]], 
json["nullable"])
+
+    def field_names(self):
+        """
+        Returns all field names in a list.
+
+        >>> row = RowType([RowField("f1", VarCharType())])
+        >>> row.field_names()
+        ['f1']
+        """
+        return list(self.names)
+
+    def need_conversion(self):
+        # We need convert Row()/namedtuple into tuple()
+        return True
+
+    def to_sql_type(self, obj):
+        if obj is None:
+            return
+
+        if self._need_serialize_any_field:
+            # Only calling to_sql_type function for fields that need conversion
+            if isinstance(obj, dict):
+                return tuple(f.to_sql_type(obj.get(n)) if c else obj.get(n)
+                             for n, f, c in zip(self.names, self.fields, 
self._need_conversion))
+            elif isinstance(obj, (tuple, list)):
+                return tuple(f.to_sql_type(v) if c else v
+                             for f, v, c in zip(self.fields, obj, 
self._need_conversion))
+            elif hasattr(obj, "__dict__"):
+                d = obj.__dict__
+                return tuple(f.to_sql_type(d.get(n)) if c else d.get(n)
+                             for n, f, c in zip(self.names, self.fields, 
self._need_conversion))
+            else:
+                raise ValueError("Unexpected tuple %r with RowType" % obj)
+        else:
+            if isinstance(obj, dict):
+                return tuple(obj.get(n) for n in self.names)
+            elif isinstance(obj, Row) and getattr(obj, "_from_dict", False):
+                return tuple(obj[n] for n in self.names)
+            elif isinstance(obj, (list, tuple)):
+                return tuple(obj)
+            elif hasattr(obj, "__dict__"):
+                d = obj.__dict__
+                return tuple(d.get(n) for n in self.names)
+            else:
+                raise ValueError("Unexpected tuple %r with RowType" % obj)
+
+    def from_sql_type(self, obj):
+        if obj is None:
+            return
+        if isinstance(obj, Row):
+            # it's already converted by pickler
+            return obj
+        if self._need_serialize_any_field:
+            # Only calling from_sql_type function for fields that need 
conversion
+            values = [f.from_sql_type(v) if c else v
+                      for f, v, c in zip(self.fields, obj, 
self._need_conversion)]
+        else:
+            values = obj
+        return _create_row(self.names, values)
+
+
+class UserDefinedType(DataType):
+    """
+    User-defined type (UDT).
+
+    .. note:: WARN: Flink Internal Use Only
+    """
+
+    def __eq__(self, other):
+        return type(self) == type(other)
+
+    @classmethod
+    def type_name(cls):
+        return cls.__name__.lower()
+
+    @classmethod
+    def sql_type(cls):
+        """
+        Underlying SQL storage type for this UDT.
+        """
+        raise NotImplementedError("UDT must implement sql_type().")
+
+    @classmethod
+    def module(cls):
+        """
+        The Python module of the UDT.
+        """
+        raise NotImplementedError("UDT must implement module().")
+
+    @classmethod
+    def java_udt(cls):
+        """
+        The class name of the paired Java UDT (could be '', if there
+        is no corresponding one).
+        """
+        return ''
+
+    def need_conversion(self):
+        return True
+
+    @classmethod
+    def _cached_sql_type(cls):
+        """
+        Caches the sql_type() into class, because it's heavy used in 
`to_sql_type`.
+        """
+        if not hasattr(cls, "__cached_sql_type"):
+            cls.__cached_sql_type = cls.sql_type()
+        return cls.__cached_sql_type
+
+    def to_sql_type(self, obj):
+        if obj is not None:
+            return self._cached_sql_type().to_sql_type(self.serialize(obj))
+
+    def from_sql_type(self, obj):
+        v = self._cached_sql_type().from_sql_type(obj)
+        if v is not None:
+            return self.deserialize(v)
+
+    def serialize(self, obj):
+        """
+        Converts the a user-type object into a SQL datum.
+        """
+        raise NotImplementedError("UDT must implement serialize().")
+
+    def deserialize(self, datum):
+        """
+        Converts a SQL datum into a user-type object.
+        """
+        raise NotImplementedError("UDT must implement deserialize().")
+
+
+_static_length_types = [BooleanType, FloatType, DoubleType, TinyIntType,
+                        SmallIntType, IntType, BigIntType, DateType]
+_static_length_type_mappings = dict((t.type_name(), t) for t in 
_static_length_types)
+
+_var_length_types = [BinaryType, VarBinaryType, CharType, VarCharType]
+_var_length_type_mappings = dict((t.type_name(), t) for t in _var_length_types)
+
+_complex_type_mappings = dict(
+    (t.type_name(), t) for t in [ArrayType, MapType, MultisetType, RowType])
+
+
+def _parse_datatype_json_value(json_value):
 
 Review comment:
   Same as the `from_json` comment. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to