dianfu commented on a change in pull request #8561: [FLINK-12588][python] Add 
TableSchema for Python Table API.
URL: https://github.com/apache/flink/pull/8561#discussion_r288526645
 
 

 ##########
 File path: flink-python/pyflink/table/types.py
 ##########
 @@ -1273,6 +1318,190 @@ def _to_java_type(data_type):
         raise TypeError("Not supported type: %s" % data_type)
 
 
+_primitive_to_boxed_map = {'int': 'java.lang.Integer',
+                           'long': 'java.lang.Long',
+                           'byte': 'java.lang.Byte',
+                           'short': 'java.lang.Short',
+                           'char': 'java.lang.Character',
+                           'boolean': 'java.lang.Boolean',
+                           'float': 'java.lang.Float',
+                           'double': 'java.lang.Double'}
+
+
+def is_instance_of(java_data_type, java_class):
+    gateway = get_gateway()
+    if isinstance(java_class, basestring):
+        param = java_class
+    elif isinstance(java_class, JavaClass):
+        param = get_java_class(java_class)
+    elif isinstance(java_class, JavaObject):
+        if not is_instance_of(java_class, gateway.jvm.Class):
+            param = java_class.getClass()
+        else:
+            param = java_class
+    else:
+        raise TypeError(
+            "java_class must be a string, a JavaClass, or a JavaObject")
+
+    return 
gateway.jvm.org.apache.flink.api.python.py4j.reflection.TypeUtil.isInstanceOf(
+        param, java_data_type)
+
+
+def _to_python_type(java_data_type_input):
+    gateway = get_gateway()
+
+    if is_instance_of(java_data_type_input, gateway.jvm.TypeInformation):
+        # input is TypeInformation
+        LegacyTypeInfoDataTypeConverter = \
+            
gateway.jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
+        java_data_type = 
LegacyTypeInfoDataTypeConverter.toDataType(java_data_type_input)
+    else:
+        # input is DataType
+        java_data_type = java_data_type_input
+
+    # Atomic Type with parameters.
+    if is_instance_of(java_data_type, gateway.jvm.AtomicDataType):
+        logical_type = java_data_type.getLogicalType()
+        conversion_clz = java_data_type.getConversionClass()
+        if is_instance_of(logical_type, gateway.jvm.CharType):
+            python_type = DataTypes.CHAR(logical_type.getLength(), 
logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.VarCharType):
+            python_type = DataTypes.VARCHAR(logical_type.getLength(), 
logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.BinaryType):
+            python_type = DataTypes.BINARY(logical_type.getLength(), 
logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.VarBinaryType):
+            python_type = DataTypes.VARBINARY(logical_type.getLength(), 
logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.DecimalType):
+            python_type = DataTypes.DECIMAL(logical_type.getPrecision(),
+                                            logical_type.getScale(),
+                                            logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.TimeType):
+            python_type = DataTypes.TIME(logical_type.getPrecision(), 
logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.TimestampType):
+            j_kind = logical_type.getKind()
+            kind = None
+            if j_kind == gateway.jvm.TimestampKind.REGULAR:
+                kind = TimestampKind.REGULAR
+            elif j_kind == gateway.jvm.TimestampKind.ROWTIME:
+                kind = TimestampKind.ROWTIME
+            elif j_kind == gateway.jvm.TimestampKind.PROCTIME:
+                kind = TimestampKind.PROCTIME
+            if kind is None:
+                raise Exception("not supported java timestamp kind %s" % 
j_kind)
+            python_type = DataTypes.TIMESTAMP(kind,
+                                              logical_type.getPrecision(),
+                                              logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.BooleanType):
+            python_type = DataTypes.BOOLEAN(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.TinyIntType):
+            python_type = DataTypes.TINYINT(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.SmallIntType):
+            python_type = DataTypes.SMALLINT(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.IntType):
+            python_type = DataTypes.INT(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.BigIntType):
+            python_type = DataTypes.BIGINT(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.FloatType):
+            python_type = DataTypes.FLOAT(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.DoubleType):
+            python_type = DataTypes.DOUBLE(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.DateType):
+            python_type = DataTypes.DATE(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.TimeType):
+            python_type = DataTypes.TIME(logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.ZonedTimestampType):
+            raise \
+                TypeError("Not supported type: %s, ZonedTimestampType is not 
supported currently."
+                          % java_data_type_input)
+        elif is_instance_of(logical_type, gateway.jvm.LocalZonedTimestampType):
+            raise \
+                TypeError("Not supported type: %s, LocalZonedTimestampType is 
not supported "
+                          "currently." % java_data_type_input)
+        elif is_instance_of(logical_type, gateway.jvm.DayTimeIntervalType):
+            raise \
+                TypeError("Not supported type: %s, DayTimeIntervalType is not 
supported currently."
+                          % java_data_type_input)
+        elif is_instance_of(logical_type, gateway.jvm.YearMonthIntervalType):
+            raise \
+                TypeError("Not supported type: %s, YearMonthIntervalType is 
not supported "
+                          "currently." % java_data_type_input)
+        elif is_instance_of(logical_type, 
gateway.jvm.LegacyTypeInformationType):
+            type_info = logical_type.getTypeInformation()
+            BasicArrayTypeInfo = 
gateway.jvm.org.apache.flink.api.common.typeinfo.\
+                BasicArrayTypeInfo
+            if type_info == BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO:
+                python_type = DataTypes.ARRAY(DataTypes.STRING())
+            else:
+                raise TypeError("Not supported type: %s, it is recognized as a 
legacy type."
+                                % type_info)
+        else:
+            raise TypeError("Not supported type: %s, it is not supported in 
python type system"
+                            " currently" % java_data_type_input)
+
+        if conversion_clz is not None:
+            type_class_name = conversion_clz.getName()
+            if type_class_name in _primitive_to_boxed_map:
+                type_class_name = _primitive_to_boxed_map[type_class_name]
+            python_type.bridged_to(type_class_name)
+        return python_type
+
+    # Array Type, MultiSet Type.
+    elif is_instance_of(java_data_type, gateway.jvm.CollectionDataType):
+        logical_type = java_data_type.getLogicalType()
+        element_type = java_data_type.getElementDataType()
+        conversion_clz = java_data_type.getConversionClass()
+        if is_instance_of(logical_type, gateway.jvm.ArrayType):
+            python_type = DataTypes.ARRAY(_to_python_type(element_type), 
logical_type.isNullable())
+        elif is_instance_of(logical_type, gateway.jvm.MultisetType):
+            python_type = DataTypes.MULTISET(_to_python_type(element_type),
+                                             logical_type.isNullable())
+        else:
+            raise TypeError("Not supported colletion data type: %s" % 
java_data_type_input)
+
+        if conversion_clz is not None:
+            python_type.bridged_to(conversion_clz.getName())
+        return python_type
+
+    # Map Type.
+    elif is_instance_of(java_data_type, gateway.jvm.KeyValueDataType):
+        logical_type = java_data_type.getLogicalType()
+        key_type = java_data_type.getKeyDataType()
+        value_type = java_data_type.getValueDataType()
+        conversion_clz = java_data_type.getConversionClass()
+        if is_instance_of(logical_type, gateway.jvm.MapType):
+            python_type = DataTypes.MAP(
+                _to_python_type(key_type),
+                _to_python_type(value_type),
+                logical_type.isNullable())
+        else:
+            raise TypeError("Not supported map data type: %s" % 
java_data_type_input)
+
+        if conversion_clz is not None:
+            python_type.bridged_to(conversion_clz.getName())
+        return python_type
+
+    # Row Type.
+    elif is_instance_of(java_data_type, gateway.jvm.FieldsDataType):
+        logical_type = java_data_type.getLogicalType()
+        field_data_types = java_data_type.getFieldDataTypes()
+        conversion_clz = java_data_type.getConversionClass()
+        if is_instance_of(logical_type, gateway.jvm.RowType):
+            fields = [DataTypes.FIELD(item,
+                                      _to_python_type(
+                                          field_data_types[item])) for item in 
field_data_types]
+            python_type = DataTypes.ROW(fields, logical_type.isNullable())
+        else:
+            raise TypeError("Not supported row data type: %s" % 
java_data_type_input)
+
+        if conversion_clz is not None:
+            python_type.bridged_to(conversion_clz.getName())
+        return python_type
+
+    # Unrecognized type.
+    else:
+        TypeError("Unsupported data type: %s" % java_data_type_input)
 
 Review comment:
   What about changing all the error message to "Unsupported data type: %s"?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to