[
https://issues.apache.org/jira/browse/FLINK-25856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Huang Xingbo resolved FLINK-25856.
----------------------------------
Resolution: Fixed
Merged into master via 6056680e148cb32323aabb307ed6f71b3eb3aec4
Merged into release-1.14 via cfc54caf4882a7339b02d06b3c43df64feb02ad5
> Fix use of UserDefinedType in from_elements
> -------------------------------------------
>
> Key: FLINK-25856
> URL: https://issues.apache.org/jira/browse/FLINK-25856
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.15.0, 1.14.3
> Reporter: Huang Xingbo
> Assignee: Huang Xingbo
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.15.0, 1.14.4
>
>
> If we define a new UserDefinedType, and use it in `from_elements`, it will
> failed.
> {code:python}
> class VectorUDT(UserDefinedType):
> @classmethod
> def sql_type(cls):
> return DataTypes.ROW(
> [
> DataTypes.FIELD("type", DataTypes.TINYINT()),
> DataTypes.FIELD("size", DataTypes.INT()),
> DataTypes.FIELD("indices", DataTypes.ARRAY(DataTypes.INT())),
> DataTypes.FIELD("values",
> DataTypes.ARRAY(DataTypes.DOUBLE())),
> ]
> )
> @classmethod
> def module(cls):
> return "pyflink.ml.core.linalg"
> def serialize(self, obj):
> if isinstance(obj, SparseVector):
> indices = [int(i) for i in obj._indices]
> values = [float(v) for v in obj._values]
> return 0, obj.size(), indices, values
> elif isinstance(obj, DenseVector):
> values = [float(v) for v in obj._values]
> return 1, None, None, values
> else:
> raise TypeError("Cannot serialize %r of type %r".format(obj,
> type(obj)))
> {code}
> {code:python}
> self.t_env.from_elements([
> (Vectors.dense([1, 2, 3, 4]), 0., 1.),
> (Vectors.dense([2, 2, 3, 4]), 0., 2.),
> (Vectors.dense([3, 2, 3, 4]), 0., 3.),
> (Vectors.dense([4, 2, 3, 4]), 0., 4.),
> (Vectors.dense([5, 2, 3, 4]), 0., 5.),
> (Vectors.dense([11, 2, 3, 4]), 1., 1.),
> (Vectors.dense([12, 2, 3, 4]), 1., 2.),
> (Vectors.dense([13, 2, 3, 4]), 1., 3.),
> (Vectors.dense([14, 2, 3, 4]), 1., 4.),
> (Vectors.dense([15, 2, 3, 4]), 1., 5.),
> ],
> DataTypes.ROW([
> DataTypes.FIELD("features", VectorUDT()),
> DataTypes.FIELD("label", DataTypes.DOUBLE()),
> DataTypes.FIELD("weight", DataTypes.DOUBLE())]))
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)