miguelpragier commented on code in PR #40090: URL: https://github.com/apache/arrow/pull/40090#discussion_r1494581000
########## go/arrow/flight/flightsql/driver/driver.go: ########## @@ -461,43 +480,63 @@ func (c *Connection) QueryContext(ctx context.Context, query string, args []driv return nil, err } - rows := Rows{} - for _, endpoint := range info.Endpoint { - schema, records, err := readEndpoint(ctx, c.client, endpoint) - if err != nil { - return &rows, err - } - if rows.schema == nil { - rows.schema = schema - } - rows.records = append(rows.records, records...) - } + rows := newRows() + go rows.streamRecordset(ctx, c.client, info.Endpoint) - return &rows, nil + <-rows.initializedChan + return rows, nil } -func readEndpoint(ctx context.Context, client *flightsql.Client, endpoint *flight.FlightEndpoint) (*arrow.Schema, []arrow.Record, error) { - reader, err := client.DoGet(ctx, endpoint.GetTicket()) - if err != nil { - return nil, nil, fmt.Errorf("getting ticket failed: %w", err) - } - defer reader.Release() +func (r *Rows) streamRecordset(ctx context.Context, c *flightsql.Client, endpoints []*flight.FlightEndpoint) { + defer close(r.recordChan) - schema := reader.Schema() - var records []arrow.Record - for reader.Next() { - if record := reader.Record(); record.NumRows() > 0 { - record.Retain() - records = append(records, record) - } - } + initializeOnceOnly := &sync.Once{} - if err := reader.Err(); err != nil && !errors.Is(err, io.EOF) { - return nil, nil, err - } + defer func() { // in case of error, init anyway + initializeOnceOnly.Do(func() { r.initializedChan <- true }) + }() + + // reads each endpoint + for _, endpoint := range endpoints { + func() { // with a func() is possible to {defer reader.Release()} + reader, err := c.DoGet(ctx, endpoint.GetTicket()) + if err != nil { + r.streamError = fmt.Errorf("getting ticket failed: %w", err) + return + } + + defer reader.Release() + + r.schema = reader.Schema() + + // reads each record into a blocking channel + for reader.Next() { + record := reader.Record() + record.Retain() + + if record.NumRows() < 1 { + record.Release() + continue + } - return schema, records, nil + select { + case r.recordChan <- record: + go initializeOnceOnly.Do(func() { r.initializedChan <- true }) + + case <-ctx.Done(): + r.releaseRecord() + r.streamError = fmt.Errorf("stream recordset context timed out") Review Comment: > Should use `ctx.Err()` instead, it could be cancelled or any other situation, not necessarily timed out. ✔️ Replaced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org