[
https://issues.apache.org/jira/browse/FLINK-30168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17641051#comment-17641051
]
Yunfeng Zhou commented on FLINK-30168:
--------------------------------------
Apart from the proposed solution mentioned in the Description section, PyFlink
needs to modify its behavior against None values as well. For example, the
following code in PythonBridgeUtils.getPickledBytesFromJavaObject
{code:java}
if (obj == null) {
return new byte[0];
} else {
{code}
might be modified into
{code:java}
if (obj == null) {
return pickler.dumps(null);
} else {
{code}
> PyFlink Deserialization Error with Object Array
> -----------------------------------------------
>
> Key: FLINK-30168
> URL: https://issues.apache.org/jira/browse/FLINK-30168
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.16.0, 1.15.2
> Reporter: Yunfeng Zhou
> Assignee: Xingbo Huang
> Priority: Major
>
> When it is attempted to collect object array records from a DataStream in
> PyFlink, an exception like follows would be thrown
> {code:java}
> data = 0, field_type = DenseVectorTypeInfo
> def pickled_bytes_to_python_converter(data, field_type):if
> isinstance(field_type, RowTypeInfo):
> row_kind = RowKind(int.from_bytes(data[0], 'little'))
> data = zip(list(data[1:]), field_type.get_field_types())
> fields = []for d, d_type in data:
> fields.append(pickled_bytes_to_python_converter(d, d_type))
> row = Row.of_kind(row_kind, *fields)return rowelse:
> > data = pickle.loads(data)
> E TypeError: a bytes-like object is required, not 'int'{code}
> I found that this error is invoked because PyFlink deals with object arrays
> differently on Java side and Python side.
>
> On Java side
> (org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromJavaObject)
> {code:java}
> ...
> else if (dataType instanceof BasicArrayTypeInfo || dataType instanceof
> PrimitiveArrayTypeInfo) {
> # recursively deal with array elements
> } ...
> else {
> # ObjectArrayTypeInfo is here
> TypeSerializer serializer = dataType.createSerializer(null);
> ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
> DataOutputViewStreamWrapper baosWrapper = new
> DataOutputViewStreamWrapper(baos); serializer.serialize(obj, baosWrapper);
> return pickler.dumps(baos.toByteArray());
> }
> {code}
>
> On python side(pyflink.datastream.utils.pickled_bytes_to_python_converter)
> {code:java}
> ...
> elif isinstance(field_type,
> (BasicArrayTypeInfo, PrimitiveArrayTypeInfo, ObjectArrayTypeInfo)):
> element_type = field_type._element_type
> elements = []
> for element_bytes in data:
> elements.append(pickled_bytes_to_python_converter(element_bytes,
> element_type))
> return elements{code}
>
> Thus a possible fix for this bug is to align PyFlink's behavior on Java side
> and Python side.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)