Biruk Tesfaye created SPARK-57275:
-------------------------------------

             Summary: Spark Connect Python client throws on valid Arrow IPC 
streams containing multiple RecordBatches
                 Key: SPARK-57275
                 URL: https://issues.apache.org/jira/browse/SPARK-57275
             Project: Spark
          Issue Type: Bug
          Components: Connect
    Affects Versions: 4.0.0
            Reporter: Biruk Tesfaye


The Arrow IPC streaming format wraps a result as 
`{{{}[Schema][RecordBatch]*[EOS]{}}}` a single message can carry multiple 
RecordBatches, and `{{{}pa.ipc.open_stream(...){}}}` parses all of them. The 
server's arrow_batch.row_count is the total rows across every RecordBatch in 
the message and the spark connect client validates the row count inside the 
per-batch loop:
 
{code:python}
  for batch in reader:
      num_records_in_batch += batch.num_rows
      if num_records_in_batch != b.arrow_batch.row_count:   # checked too early
          raise SparkConnectException(...)
      num_records += num_records_in_batch                    # also 
double-counts
{code}

When a message contains more than one RecordBatch, the check fires after the 
first batch before the stream is fully consumed and throws:

  {{SparkConnectException: Expected N rows in arrow batch but got M.   (M < N)}}

*Impact*: Any code path that produces multi-RecordBatch IPC streams (e.g. 
Arrow-native IPC buffer compression) fails to fetch results, even though the 
payload is well-formed and parseable by PyArrow.

 *Fix*: Count each RecordBatch once (num_records += batch.num_rows) and 
validate row_count only after the IPC stream is fully consumed. (The Scala 
client in SparkResult.scala already validates after the loop and is unaffected.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to