[ 
https://issues.apache.org/jira/browse/FLINK-30168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yunfeng Zhou updated FLINK-30168:
---------------------------------
    Description: 
When it is attempted to collect object array records from a DataStream in 
PyFlink, an exception like follows would be thrown
{code:java}
data = 0, field_type = DenseVectorTypeInfo
def pickled_bytes_to_python_converter(data, field_type):if 
isinstance(field_type, RowTypeInfo):
row_kind = RowKind(int.from_bytes(data[0], 'little'))
data = zip(list(data[1:]), field_type.get_field_types())
fields = []for d, d_type in data:
fields.append(pickled_bytes_to_python_converter(d, d_type))
row = Row.of_kind(row_kind, *fields)return rowelse:
> data = pickle.loads(data)
E TypeError: a bytes-like object is required, not 'int'{code}
I found that this error is invoked because PyFlink deals with object arrays 
differently on Java side and Python side. 

 

On Java side 
(org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromJavaObject)
{code:java}
...
else if (dataType instanceof BasicArrayTypeInfo || dataType instanceof 
PrimitiveArrayTypeInfo) {
# recursively deal with array elements
} ...
else {
# ObjectArrayTypeInfo is here
TypeSerializer serializer = dataType.createSerializer(null); 
ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos(); 
DataOutputViewStreamWrapper baosWrapper = new 
DataOutputViewStreamWrapper(baos); serializer.serialize(obj, baosWrapper); 
return pickler.dumps(baos.toByteArray());
}
{code}
 

On python side(pyflink.datastream.utils.pickled_bytes_to_python_converter)
{code:java}
...
elif isinstance(field_type,
(BasicArrayTypeInfo, PrimitiveArrayTypeInfo, ObjectArrayTypeInfo)):
  element_type = field_type._element_type
  elements = []
  for element_bytes in data:
    elements.append(pickled_bytes_to_python_converter(element_bytes, 
element_type))
  return elements{code}
 

Thus a possible fix for this bug is to align PyFlink's behavior on Java side 
and Python side.

  was:
When it is attempted to collect object array records from a DataStream in 
PyFlink, an exception like follows would be thrown
data = 0, field_type = DenseVectorTypeInfo
def pickled_bytes_to_python_converter(data, field_type):if 
isinstance(field_type, RowTypeInfo):
row_kind = RowKind(int.from_bytes(data[0], 'little'))
data = zip(list(data[1:]), field_type.get_field_types())
fields = []for d, d_type in data:
fields.append(pickled_bytes_to_python_converter(d, d_type))
row = Row.of_kind(row_kind, *fields)return rowelse:
> data = pickle.loads(data)
E TypeError: a bytes-like object is required, not 'int'
I found that this error is invoked because PyFlink deals with object arrays 
differently on Java side and Python side. 

 

On Java side 
(org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromJavaObject)

 
{code:java}
...
else if (dataType instanceof BasicArrayTypeInfo || dataType instanceof 
PrimitiveArrayTypeInfo) {
# recursively deal with array elements
} ...
else {
# ObjectArrayTypeInfo is here
TypeSerializer serializer = dataType.createSerializer(null); 
ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos(); 
DataOutputViewStreamWrapper baosWrapper = new 
DataOutputViewStreamWrapper(baos); serializer.serialize(obj, baosWrapper); 
return pickler.dumps(baos.toByteArray());
}
{code}
 

On python side(pyflink.datastream.utils.pickled_bytes_to_python_converter)
{code:java}
...
elif isinstance(field_type,
(BasicArrayTypeInfo, PrimitiveArrayTypeInfo, ObjectArrayTypeInfo)):
  element_type = field_type._element_type
  elements = []
  for element_bytes in data:
    elements.append(pickled_bytes_to_python_converter(element_bytes, 
element_type))
  return elements{code}
 

 

Thus a possible fix for this bug is to align PyFlink's behavior on Java side 
and Python side.


> PyFlink Deserialization Error with Object Array
> -----------------------------------------------
>
>                 Key: FLINK-30168
>                 URL: https://issues.apache.org/jira/browse/FLINK-30168
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Python
>    Affects Versions: 1.16.0, 1.15.2
>            Reporter: Yunfeng Zhou
>            Priority: Major
>
> When it is attempted to collect object array records from a DataStream in 
> PyFlink, an exception like follows would be thrown
> {code:java}
> data = 0, field_type = DenseVectorTypeInfo
> def pickled_bytes_to_python_converter(data, field_type):if 
> isinstance(field_type, RowTypeInfo):
> row_kind = RowKind(int.from_bytes(data[0], 'little'))
> data = zip(list(data[1:]), field_type.get_field_types())
> fields = []for d, d_type in data:
> fields.append(pickled_bytes_to_python_converter(d, d_type))
> row = Row.of_kind(row_kind, *fields)return rowelse:
> > data = pickle.loads(data)
> E TypeError: a bytes-like object is required, not 'int'{code}
> I found that this error is invoked because PyFlink deals with object arrays 
> differently on Java side and Python side. 
>  
> On Java side 
> (org.apache.flink.api.common.python.PythonBridgeUtils.getPickledBytesFromJavaObject)
> {code:java}
> ...
> else if (dataType instanceof BasicArrayTypeInfo || dataType instanceof 
> PrimitiveArrayTypeInfo) {
> # recursively deal with array elements
> } ...
> else {
> # ObjectArrayTypeInfo is here
> TypeSerializer serializer = dataType.createSerializer(null); 
> ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos(); 
> DataOutputViewStreamWrapper baosWrapper = new 
> DataOutputViewStreamWrapper(baos); serializer.serialize(obj, baosWrapper); 
> return pickler.dumps(baos.toByteArray());
> }
> {code}
>  
> On python side(pyflink.datastream.utils.pickled_bytes_to_python_converter)
> {code:java}
> ...
> elif isinstance(field_type,
> (BasicArrayTypeInfo, PrimitiveArrayTypeInfo, ObjectArrayTypeInfo)):
>   element_type = field_type._element_type
>   elements = []
>   for element_bytes in data:
>     elements.append(pickled_bytes_to_python_converter(element_bytes, 
> element_type))
>   return elements{code}
>  
> Thus a possible fix for this bug is to align PyFlink's behavior on Java side 
> and Python side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to