sunjincheng121 commented on a change in pull request #8561:
[FLINK-12588][python] Add TableSchema for Python Table API.
URL: https://github.com/apache/flink/pull/8561#discussion_r288812196
##########
File path: flink-python/pyflink/table/types.py
##########
@@ -1273,6 +1318,190 @@ def _to_java_type(data_type):
raise TypeError("Not supported type: %s" % data_type)
+_primitive_to_boxed_map = {'int': 'java.lang.Integer',
+ 'long': 'java.lang.Long',
+ 'byte': 'java.lang.Byte',
+ 'short': 'java.lang.Short',
+ 'char': 'java.lang.Character',
+ 'boolean': 'java.lang.Boolean',
+ 'float': 'java.lang.Float',
+ 'double': 'java.lang.Double'}
+
+
+def _is_instance_of(java_data_type, java_class):
+ gateway = get_gateway()
+ if isinstance(java_class, basestring):
+ param = java_class
+ elif isinstance(java_class, JavaClass):
+ param = get_java_class(java_class)
+ elif isinstance(java_class, JavaObject):
+ if not _is_instance_of(java_class, gateway.jvm.Class):
+ param = java_class.getClass()
+ else:
+ param = java_class
+ else:
+ raise TypeError(
+ "java_class must be a string, a JavaClass, or a JavaObject")
+
+ return
gateway.jvm.org.apache.flink.api.python.py4j.reflection.TypeUtil.isInstanceOf(
+ param, java_data_type)
+
+
+def _from_java_type(j_data_type):
+ gateway = get_gateway()
+
+ if _is_instance_of(j_data_type, gateway.jvm.TypeInformation):
+ # input is TypeInformation
+ LegacyTypeInfoDataTypeConverter = \
+
gateway.jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
+ java_data_type =
LegacyTypeInfoDataTypeConverter.toDataType(j_data_type)
+ else:
+ # input is DataType
+ java_data_type = j_data_type
+
+ # Atomic Type with parameters.
+ if _is_instance_of(java_data_type, gateway.jvm.AtomicDataType):
+ logical_type = java_data_type.getLogicalType()
+ conversion_clz = java_data_type.getConversionClass()
+ if _is_instance_of(logical_type, gateway.jvm.CharType):
+ data_type = DataTypes.CHAR(logical_type.getLength(),
logical_type.isNullable())
+ elif _is_instance_of(logical_type, gateway.jvm.VarCharType):
+ data_type = DataTypes.VARCHAR(logical_type.getLength(),
logical_type.isNullable())
+ elif _is_instance_of(logical_type, gateway.jvm.BinaryType):
+ data_type = DataTypes.BINARY(logical_type.getLength(),
logical_type.isNullable())
+ elif _is_instance_of(logical_type, gateway.jvm.VarBinaryType):
+ data_type = DataTypes.VARBINARY(logical_type.getLength(),
logical_type.isNullable())
+ elif _is_instance_of(logical_type, gateway.jvm.DecimalType):
+ data_type = DataTypes.DECIMAL(logical_type.getPrecision(),
+ logical_type.getScale(),
+ logical_type.isNullable())
+ elif _is_instance_of(logical_type, gateway.jvm.TimeType):
+ data_type = DataTypes.TIME(logical_type.getPrecision(),
logical_type.isNullable())
+ elif _is_instance_of(logical_type, gateway.jvm.TimestampType):
+ j_kind = logical_type.getKind()
+ kind = None
+ if j_kind == gateway.jvm.TimestampKind.REGULAR:
+ kind = TimestampKind.REGULAR
+ elif j_kind == gateway.jvm.TimestampKind.ROWTIME:
+ kind = TimestampKind.ROWTIME
+ elif j_kind == gateway.jvm.TimestampKind.PROCTIME:
+ kind = TimestampKind.PROCTIME
+ if kind is None:
+ raise Exception("Unsupported java timestamp kind %s" % j_kind)
+ data_type = DataTypes.TIMESTAMP(kind,
+ logical_type.getPrecision(),
Review comment:
Same as above.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services