[ 
https://issues.apache.org/jira/browse/ARROW-17912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17612284#comment-17612284
 ] 

Liangcai li edited comment on ARROW-17912 at 10/3/22 1:24 PM:
--------------------------------------------------------------

Thx a lot for review.

— Why not provide the schema on the Python side? 

One Pyspark serializer may read batches with different schemas, so better to 
infer the "local" schema from a batch, not a "global" schema for all the read 
batches. [Cogroup is one of this kind of 
cases.|https://github.com/apache/spark/blob/branch-3.3/python/pyspark/sql/pandas/serializers.py#L363]
 The _CogroupUDFSerializer_ will load batches from two tables, and the two 
tables are likely to have different schemas.

 

— Or else, the C++ side can explicitly write out an empty batch. But I don't 
think it makes sense to adjust behavior for all programs.

Java writer indeed can send out an empty table. And it makes sense I think to 
transport an empty table for the cases of joining two tables. 

Besides, personally this would be a positive change to let C++ IPC work with 
more cases, just like the Java IPC. Or else, could we have a config to switch 
between the two behaviors?

 

— Looks like PySpark should have the schema in ArrowStreamSerializer, it's just 
discarding it.

Similarly, one Pyspark serializer may read batches with different schemas, so 
better to infer the "local" schema from a batch, even it is empty.


was (Author: JIRAUSER296424):
Thx a lot for review.

— Why not provide the schema on the Python side? 

One Pyspark serializer may read batches with different schemas, so better to 
infer the "local" schema from a batch, not a "global" schema for all the read 
batches. [Cogroup is one of this kind of 
cases.|https://github.com/apache/spark/blob/branch-3.3/python/pyspark/sql/pandas/serializers.py#L363]
 The _CogroupUDFSerializer_ will load batches from two tables, and the two 
tables are likely to have different schemas.

 

— Or else, the C++ side can explicitly write out an empty batch. But I don't 
think it makes sense to adjust behavior for all programs.

Java writer indeed can send out an empty table. And it makes sense I think to 
transport an empty table for the cases of joining two tables. 

 

— Looks like PySpark should have the schema in ArrowStreamSerializer, it's just 
discarding it.

Similarly, one Pyspark serializer may read batches with different schemas, so 
better to infer the "local" schema from a batch, even it is empty.

> Arrow C++ IPC fails to send an empty table, but Arrow Java can do it.
> ---------------------------------------------------------------------
>
>                 Key: ARROW-17912
>                 URL: https://issues.apache.org/jira/browse/ARROW-17912
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: C++
>            Reporter: Liangcai li
>            Priority: Major
>
> My current work is about Pyspark Cogroup Pandas UDF. And two processes are 
> involved, the JVM one (sender) and the Python one (receiver).
> [Spark is using the Arrow Java 
> `ArrowStreamWriter`|https://github.com/apache/spark/blob/branch-3.3/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala#L99]
>  to serialize Arrow tables being sent from the JVM process to the Python 
> process, and ArrowStreamWriter can handle empty tables correctly.
> [While cuDF is using the Arrow C++ RecordBatchWriter 
> |https://github.com/rapidsai/cudf/blob/branch-22.10/java/src/main/native/src/TableJni.cpp#L254]to
>  do the same serialization, but it leads to an error as below on the Python 
> side, where [the Pyspark is calling Pyarrow 
> *Table.from_batches*|https://github.com/apache/spark/blob/branch-3.3/python/pyspark/sql/pandas/serializers.py#L366]
>  to deserialize the arrow stream.
> ``` 
> _E                     File 
> "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", 
> line 297, in load_stream_
> _E                       [self.arrow_to_pandas(c) for c in 
> pa.Table.from_batches(batch2).itercolumns()]_
> _E                     File "pyarrow/table.pxi", line 1609, in 
> pyarrow.lib.Table.from_batches_
> _E                   {color:#de350b}*ValueError: Must pass schema, or at 
> least one RecordBatch*{color}_
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to