[ 
https://issues.apache.org/jira/browse/FLINK-30124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated FLINK-30124:
-----------------------------
    Fix Version/s: ml-2.2.0

> GenericType<java.util.Map> is not supported in PyFlink currently
> ----------------------------------------------------------------
>
>                 Key: FLINK-30124
>                 URL: https://issues.apache.org/jira/browse/FLINK-30124
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / Machine Learning
>    Affects Versions: ml-2.1.0
>            Reporter: Yunfeng Zhou
>            Assignee: Yunfeng Zhou
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: ml-2.2.0
>
>
> When we add and execute the following test case to 
> flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py of the 
> Flink ML repository,
> {code:java}
> def test_get_model_data(self):
> model_data = self.estimator.fit(self.train_data).get_model_data()[0]
> self.t_env.to_data_stream(model_data).execute_and_collect().next(){code}
> The following exception would be thrown.
>  
> {code:java}
> j_type_info = JavaObject id=o698
>     def _from_java_type(j_type_info: JavaObject) -> TypeInformation:
>         gateway = get_gateway()
>         JBasicTypeInfo = 
> gateway.jvm.org.apache.flink.api.common.typeinfo.BasicTypeInfo
>     
>         if _is_instance_of(j_type_info, JBasicTypeInfo.STRING_TYPE_INFO):
>             return Types.STRING()
>         elif _is_instance_of(j_type_info, JBasicTypeInfo.BOOLEAN_TYPE_INFO):
>             return Types.BOOLEAN()
>         elif _is_instance_of(j_type_info, JBasicTypeInfo.BYTE_TYPE_INFO):
>             return Types.BYTE()
>         elif _is_instance_of(j_type_info, JBasicTypeInfo.SHORT_TYPE_INFO):
>             return Types.SHORT()
>         elif _is_instance_of(j_type_info, JBasicTypeInfo.INT_TYPE_INFO):
>             return Types.INT()
>         elif _is_instance_of(j_type_info, JBasicTypeInfo.LONG_TYPE_INFO):
>             return Types.LONG()
>         elif _is_instance_of(j_type_info, JBasicTypeInfo.FLOAT_TYPE_INFO):
>             return Types.FLOAT()
>         elif _is_instance_of(j_type_info, JBasicTypeInfo.DOUBLE_TYPE_INFO):
>             return Types.DOUBLE()
>         elif _is_instance_of(j_type_info, JBasicTypeInfo.CHAR_TYPE_INFO):
>             return Types.CHAR()
>         elif _is_instance_of(j_type_info, JBasicTypeInfo.BIG_INT_TYPE_INFO):
>             return Types.BIG_INT()
>         elif _is_instance_of(j_type_info, JBasicTypeInfo.BIG_DEC_TYPE_INFO):
>             return Types.BIG_DEC()
>         elif _is_instance_of(j_type_info, JBasicTypeInfo.INSTANT_TYPE_INFO):
>             return Types.INSTANT()
>     
>         JSqlTimeTypeInfo = 
> gateway.jvm.org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
>         if _is_instance_of(j_type_info, JSqlTimeTypeInfo.DATE):
>             return Types.SQL_DATE()
>         elif _is_instance_of(j_type_info, JSqlTimeTypeInfo.TIME):
>             return Types.SQL_TIME()
>         elif _is_instance_of(j_type_info, JSqlTimeTypeInfo.TIMESTAMP):
>             return Types.SQL_TIMESTAMP()
>     
>         JPrimitiveArrayTypeInfo = 
> gateway.jvm.org.apache.flink.api.common.typeinfo \
>             .PrimitiveArrayTypeInfo
>     
>         if _is_instance_of(j_type_info, 
> JPrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO):
>             return Types.PRIMITIVE_ARRAY(Types.BOOLEAN())
>         elif _is_instance_of(j_type_info, 
> JPrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO):
>             return Types.PRIMITIVE_ARRAY(Types.BYTE())
>         elif _is_instance_of(j_type_info, 
> JPrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO):
>             return Types.PRIMITIVE_ARRAY(Types.SHORT())
>         elif _is_instance_of(j_type_info, 
> JPrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO):
>             return Types.PRIMITIVE_ARRAY(Types.INT())
>         elif _is_instance_of(j_type_info, 
> JPrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO):
>             return Types.PRIMITIVE_ARRAY(Types.LONG())
>         elif _is_instance_of(j_type_info, 
> JPrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO):
>             return Types.PRIMITIVE_ARRAY(Types.FLOAT())
>         elif _is_instance_of(j_type_info, 
> JPrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO):
>             return Types.PRIMITIVE_ARRAY(Types.DOUBLE())
>         elif _is_instance_of(j_type_info, 
> JPrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO):
>             return Types.PRIMITIVE_ARRAY(Types.CHAR())
>     
>         JBasicArrayTypeInfo = 
> gateway.jvm.org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo
>     
>         if _is_instance_of(j_type_info, 
> JBasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO):
>             return Types.BASIC_ARRAY(Types.BOOLEAN())
>         elif _is_instance_of(j_type_info, 
> JBasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO):
>             return Types.BASIC_ARRAY(Types.BYTE())
>         elif _is_instance_of(j_type_info, 
> JBasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO):
>             return Types.BASIC_ARRAY(Types.SHORT())
>         elif _is_instance_of(j_type_info, 
> JBasicArrayTypeInfo.INT_ARRAY_TYPE_INFO):
>             return Types.BASIC_ARRAY(Types.INT())
>         elif _is_instance_of(j_type_info, 
> JBasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO):
>             return Types.BASIC_ARRAY(Types.LONG())
>         elif _is_instance_of(j_type_info, 
> JBasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO):
>             return Types.BASIC_ARRAY(Types.FLOAT())
>         elif _is_instance_of(j_type_info, 
> JBasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO):
>             return Types.BASIC_ARRAY(Types.DOUBLE())
>         elif _is_instance_of(j_type_info, 
> JBasicArrayTypeInfo.CHAR_ARRAY_TYPE_INFO):
>             return Types.BASIC_ARRAY(Types.CHAR())
>         elif _is_instance_of(j_type_info, 
> JBasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO):
>             return Types.BASIC_ARRAY(Types.STRING())
>     
>         JObjectArrayTypeInfo = 
> gateway.jvm.org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
>         if _is_instance_of(j_type_info, JObjectArrayTypeInfo):
>             return 
> Types.OBJECT_ARRAY(_from_java_type(j_type_info.getComponentInfo()))
>     
>         JPickledBytesTypeInfo = gateway.jvm \
>             
> .org.apache.flink.streaming.api.typeinfo.python.PickledByteArrayTypeInfo\
>             .PICKLED_BYTE_ARRAY_TYPE_INFO
>         if _is_instance_of(j_type_info, JPickledBytesTypeInfo):
>             return Types.PICKLED_BYTE_ARRAY()
>     
>         JRowTypeInfo = 
> gateway.jvm.org.apache.flink.api.java.typeutils.RowTypeInfo
>         if _is_instance_of(j_type_info, JRowTypeInfo):
>             j_row_field_names = j_type_info.getFieldNames()
>             j_row_field_types = j_type_info.getFieldTypes()
>             row_field_types = [_from_java_type(j_row_field_type) for 
> j_row_field_type in
>                                j_row_field_types]
>             row_field_names = [field_name for field_name in j_row_field_names]
>             return Types.ROW_NAMED(row_field_names, row_field_types)
>     
>         JTupleTypeInfo = 
> gateway.jvm.org.apache.flink.api.java.typeutils.TupleTypeInfo
>         if _is_instance_of(j_type_info, JTupleTypeInfo):
>             j_field_types = []
>             for i in range(j_type_info.getArity()):
>                 j_field_types.append(j_type_info.getTypeAt(i))
>             field_types = [_from_java_type(j_field_type) for j_field_type in 
> j_field_types]
>             return TupleTypeInfo(field_types)
>     
>         JMapTypeInfo = 
> get_gateway().jvm.org.apache.flink.api.java.typeutils.MapTypeInfo
>         if _is_instance_of(j_type_info, JMapTypeInfo):
>             j_key_type_info = j_type_info.getKeyTypeInfo()
>             j_value_type_info = j_type_info.getValueTypeInfo()
>             return MapTypeInfo(_from_java_type(j_key_type_info), 
> _from_java_type(j_value_type_info))
>     
>         JListTypeInfo = 
> get_gateway().jvm.org.apache.flink.api.java.typeutils.ListTypeInfo
>         if _is_instance_of(j_type_info, JListTypeInfo):
>             j_element_type_info = j_type_info.getElementTypeInfo()
>             return ListTypeInfo(_from_java_type(j_element_type_info))
>     
>         JExternalTypeInfo = 
> gateway.jvm.org.apache.flink.table.runtime.typeutils.ExternalTypeInfo
>         if _is_instance_of(j_type_info, JExternalTypeInfo):
>             TypeInfoDataTypeConverter = \
>                 
> gateway.jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
>             return ExternalTypeInfo(_from_java_type(
>                 
> TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType())))
>     
> >       raise TypeError("The java type info: %s is not supported in PyFlink 
> > currently." % j_type_info)
> E       TypeError: The java type info: GenericType<java.util.Map> is not 
> supported in PyFlink currently.
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to