sunjincheng121 commented on a change in pull request #8561: 
[FLINK-12588][python] Add TableSchema for Python Table API.
URL: https://github.com/apache/flink/pull/8561#discussion_r288812260
 
 

 ##########
 File path: flink-python/pyflink/table/types.py
 ##########
 @@ -1273,6 +1318,190 @@ def _to_java_type(data_type):
         raise TypeError("Not supported type: %s" % data_type)
 
 
+_primitive_to_boxed_map = {'int': 'java.lang.Integer',
+                           'long': 'java.lang.Long',
+                           'byte': 'java.lang.Byte',
+                           'short': 'java.lang.Short',
+                           'char': 'java.lang.Character',
+                           'boolean': 'java.lang.Boolean',
+                           'float': 'java.lang.Float',
+                           'double': 'java.lang.Double'}
+
+
+def _is_instance_of(java_data_type, java_class):
+    gateway = get_gateway()
+    if isinstance(java_class, basestring):
+        param = java_class
+    elif isinstance(java_class, JavaClass):
+        param = get_java_class(java_class)
+    elif isinstance(java_class, JavaObject):
+        if not _is_instance_of(java_class, gateway.jvm.Class):
+            param = java_class.getClass()
+        else:
+            param = java_class
+    else:
+        raise TypeError(
+            "java_class must be a string, a JavaClass, or a JavaObject")
+
+    return 
gateway.jvm.org.apache.flink.api.python.py4j.reflection.TypeUtil.isInstanceOf(
+        param, java_data_type)
+
+
+def _from_java_type(j_data_type):
+    gateway = get_gateway()
+
+    if _is_instance_of(j_data_type, gateway.jvm.TypeInformation):
+        # input is TypeInformation
+        LegacyTypeInfoDataTypeConverter = \
+            
gateway.jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
+        java_data_type = 
LegacyTypeInfoDataTypeConverter.toDataType(j_data_type)
+    else:
+        # input is DataType
+        java_data_type = j_data_type
+
+    # Atomic Type with parameters.
+    if _is_instance_of(java_data_type, gateway.jvm.AtomicDataType):
+        logical_type = java_data_type.getLogicalType()
+        conversion_clz = java_data_type.getConversionClass()
+        if _is_instance_of(logical_type, gateway.jvm.CharType):
+            data_type = DataTypes.CHAR(logical_type.getLength(), 
logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.VarCharType):
+            data_type = DataTypes.VARCHAR(logical_type.getLength(), 
logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.BinaryType):
+            data_type = DataTypes.BINARY(logical_type.getLength(), 
logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.VarBinaryType):
+            data_type = DataTypes.VARBINARY(logical_type.getLength(), 
logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.DecimalType):
+            data_type = DataTypes.DECIMAL(logical_type.getPrecision(),
+                                            logical_type.getScale(),
+                                            logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.TimeType):
+            data_type = DataTypes.TIME(logical_type.getPrecision(), 
logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.TimestampType):
+            j_kind = logical_type.getKind()
+            kind = None
+            if j_kind == gateway.jvm.TimestampKind.REGULAR:
+                kind = TimestampKind.REGULAR
+            elif j_kind == gateway.jvm.TimestampKind.ROWTIME:
+                kind = TimestampKind.ROWTIME
+            elif j_kind == gateway.jvm.TimestampKind.PROCTIME:
+                kind = TimestampKind.PROCTIME
+            if kind is None:
+                raise Exception("Unsupported java timestamp kind %s" % j_kind)
+            data_type = DataTypes.TIMESTAMP(kind,
+                                              logical_type.getPrecision(),
+                                              logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.BooleanType):
+            data_type = DataTypes.BOOLEAN(logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.TinyIntType):
+            data_type = DataTypes.TINYINT(logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.SmallIntType):
+            data_type = DataTypes.SMALLINT(logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.IntType):
+            data_type = DataTypes.INT(logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.BigIntType):
+            data_type = DataTypes.BIGINT(logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.FloatType):
+            data_type = DataTypes.FLOAT(logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.DoubleType):
+            data_type = DataTypes.DOUBLE(logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.DateType):
+            data_type = DataTypes.DATE(logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.TimeType):
+            data_type = DataTypes.TIME(logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.ZonedTimestampType):
+            raise \
+                TypeError("Unsupported type: %s, ZonedTimestampType is not 
supported yet."
+                          % j_data_type)
+        elif _is_instance_of(logical_type, 
gateway.jvm.LocalZonedTimestampType):
+            raise \
+                TypeError("Unsupported type: %s, LocalZonedTimestampType is 
not supported "
+                          "currently." % j_data_type)
+        elif _is_instance_of(logical_type, gateway.jvm.DayTimeIntervalType):
+            raise \
+                TypeError("Unsupported type: %s, DayTimeIntervalType is not 
supported yet."
+                          % j_data_type)
+        elif _is_instance_of(logical_type, gateway.jvm.YearMonthIntervalType):
+            raise \
+                TypeError("Unsupported type: %s, YearMonthIntervalType is not 
supported "
+                          "currently." % j_data_type)
+        elif _is_instance_of(logical_type, 
gateway.jvm.LegacyTypeInformationType):
+            type_info = logical_type.getTypeInformation()
+            BasicArrayTypeInfo = 
gateway.jvm.org.apache.flink.api.common.typeinfo.\
+                BasicArrayTypeInfo
+            if type_info == BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO:
+                data_type = DataTypes.ARRAY(DataTypes.STRING())
+            else:
+                raise TypeError("Unsupported type: %s, it is recognized as a 
legacy type."
+                                % type_info)
+        else:
+            raise TypeError("Unsupported type: %s, it is not supported yet in 
current python type"
+                            " system" % j_data_type)
+
+        if conversion_clz is not None:
+            type_class_name = conversion_clz.getName()
+            if type_class_name in _primitive_to_boxed_map:
+                type_class_name = _primitive_to_boxed_map[type_class_name]
+            data_type.bridged_to(type_class_name)
+        return data_type
+
+    # Array Type, MultiSet Type.
+    elif _is_instance_of(java_data_type, gateway.jvm.CollectionDataType):
+        logical_type = java_data_type.getLogicalType()
+        element_type = java_data_type.getElementDataType()
+        conversion_clz = java_data_type.getConversionClass()
+        if _is_instance_of(logical_type, gateway.jvm.ArrayType):
+            data_type = DataTypes.ARRAY(_from_java_type(element_type), 
logical_type.isNullable())
+        elif _is_instance_of(logical_type, gateway.jvm.MultisetType):
+            data_type = DataTypes.MULTISET(_from_java_type(element_type),
+                                             logical_type.isNullable())
 
 Review comment:
   Same above

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to