[ 
https://issues.apache.org/jira/browse/FLINK-25856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo resolved FLINK-25856.
----------------------------------
    Resolution: Fixed

Merged into master via 6056680e148cb32323aabb307ed6f71b3eb3aec4
Merged into release-1.14 via cfc54caf4882a7339b02d06b3c43df64feb02ad5

> Fix use of UserDefinedType in from_elements
> -------------------------------------------
>
>                 Key: FLINK-25856
>                 URL: https://issues.apache.org/jira/browse/FLINK-25856
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.15.0, 1.14.3
>            Reporter: Huang Xingbo
>            Assignee: Huang Xingbo
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.15.0, 1.14.4
>
>
> If we define a new UserDefinedType, and use it in `from_elements`, it will 
> failed.
> {code:python}
> class VectorUDT(UserDefinedType):
>     @classmethod
>     def sql_type(cls):
>         return DataTypes.ROW(
>             [
>                 DataTypes.FIELD("type", DataTypes.TINYINT()),
>                 DataTypes.FIELD("size", DataTypes.INT()),
>                 DataTypes.FIELD("indices", DataTypes.ARRAY(DataTypes.INT())),
>                 DataTypes.FIELD("values", 
> DataTypes.ARRAY(DataTypes.DOUBLE())),
>             ]
>         )
>     @classmethod
>     def module(cls):
>         return "pyflink.ml.core.linalg"
>     def serialize(self, obj):
>         if isinstance(obj, SparseVector):
>             indices = [int(i) for i in obj._indices]
>             values = [float(v) for v in obj._values]
>             return 0, obj.size(), indices, values
>         elif isinstance(obj, DenseVector):
>             values = [float(v) for v in obj._values]
>             return 1, None, None, values
>         else:
>             raise TypeError("Cannot serialize %r of type %r".format(obj, 
> type(obj)))
> {code}
> {code:python}
> self.t_env.from_elements([
>             (Vectors.dense([1, 2, 3, 4]), 0., 1.),
>             (Vectors.dense([2, 2, 3, 4]), 0., 2.),
>             (Vectors.dense([3, 2, 3, 4]), 0., 3.),
>             (Vectors.dense([4, 2, 3, 4]), 0., 4.),
>             (Vectors.dense([5, 2, 3, 4]), 0., 5.),
>             (Vectors.dense([11, 2, 3, 4]), 1., 1.),
>             (Vectors.dense([12, 2, 3, 4]), 1., 2.),
>             (Vectors.dense([13, 2, 3, 4]), 1., 3.),
>             (Vectors.dense([14, 2, 3, 4]), 1., 4.),
>             (Vectors.dense([15, 2, 3, 4]), 1., 5.),
>         ],
>             DataTypes.ROW([
>                 DataTypes.FIELD("features", VectorUDT()),
>                 DataTypes.FIELD("label", DataTypes.DOUBLE()),
>                 DataTypes.FIELD("weight", DataTypes.DOUBLE())]))
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to