ryanschneider opened a new issue, #451:
URL: https://github.com/apache/arrow-go/issues/451

   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   I'm using `flight.StreamChunksFromReader` in my `DoGetStatement` statement 
handler to serve chunks from a parquet file, more or less like so (err checks 
removed, how I get the ReaderAtSeeker abstracted out)
   
   ```golang
   func (s *flightServer) DoGetStatement(ctx context.Context, cmd 
flightsql.StatementQueryTicket) (*arrow.Schema, <-chan flight.StreamChunk, 
error) {
        r := getParquetFile(...) // returns a ReaderAtSeeker
        pqr, err := file.NewParquetReader(r, 
file.WithReadProps(&parquet.ReaderProperties{
                BufferedStreamEnabled: true,
        }))
        arr, err := pqarrow.NewFileReader(
                pqr,
                pqarrow.ArrowReadProperties{
                        Parallel:  false,
                        BatchSize: batchSize,
                },
                memory.DefaultAllocator,
        )
        schema, err := arr.Schema()
        rr, err := arr.GetRecordReader(ctx, nil, nil)
        ch := make(chan flight.StreamChunk)
   
        go streamChunksFromReader(rr, ch)
        return schema, ch, nil
   }
   ```
   
   But this consistently returns `EOF` to the client _after_ sending all the 
records, basically the reader returned by `client.DoGet()` fails on 
`rdr.Read()` w/ `rpc error: code = Unknown desc = EOF` after reading the last 
chunk, instead of returning `io.EOF` like it should.
   
   I tracked this down to the trailing error check  in 
`flight.StreamChunksFromReader`: 
https://github.com/apache/arrow-go/blob/main/arrow/flight/record_batch_reader.go#L235-L238
   
   It seems like the `RecordReader` returned by 
`pqarrow.FileReader.GetRecordReader` sets it's internal err state to `io.EOF` 
after the last record is read.  I was able to work around this using a slightly 
modified local `streamChunksFromReader` implementation:
   
   ```golang
   func streamChunksFromReader(rdr array.RecordReader, ch chan<- 
flight.StreamChunk) {
        defer close(ch)
        defer func() {
                if err := recover(); err != nil {
                        slog.Warn("panic while reading", slog.Any("err", err))
                        ch <- flight.StreamChunk{Err: fmt.Errorf("panic while 
reading: %v", err)}
                }
        }()
   
        defer rdr.Release()
        for rdr.Next() {
                rec := rdr.Record()
                rec.Retain()
                slog.Debug("record received", slog.Any("rows", rec.NumRows()), 
slog.Any("cols", rec.NumCols()))
                ch <- flight.StreamChunk{Data: rec}
   
                if err := rdr.Err(); err != nil {
                        slog.Warn("error while reading", slog.Any("err", err))
                        ch <- flight.StreamChunk{Err: err}
                }
        }
   
        if err := rdr.Err(); err != nil && err != io.EOF {
                slog.Warn("error after reading", slog.Any("err", err))
                ch <- flight.StreamChunk{Err: err}
        }
   }
   ```
   
   The changes are two-fold:
   
   - If `rdr.Err()` returns when `.Next()` returned true, always report that.
   - If after `.Next()` returns false, only report non-EOF errors.
   
   The added logging and other minors changes were just for pinpointing the 
issue.  I'm not sure if this is a bug in `pqarrow` or 
`flight.StreamChunksFromReader`
   
   
   ### Component(s)
   
   Other, Parquet


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to