mgross-ebner commented on code in PR #40090:
URL: https://github.com/apache/arrow/pull/40090#discussion_r1492689487


##########
go/arrow/flight/flightsql/driver/driver.go:
##########
@@ -461,43 +480,63 @@ func (c *Connection) QueryContext(ctx context.Context, 
query string, args []driv
                return nil, err
        }
 
-       rows := Rows{}
-       for _, endpoint := range info.Endpoint {
-               schema, records, err := readEndpoint(ctx, c.client, endpoint)
-               if err != nil {
-                       return &rows, err
-               }
-               if rows.schema == nil {
-                       rows.schema = schema
-               }
-               rows.records = append(rows.records, records...)
-       }
+       rows := newRows()
+       go rows.streamRecordset(ctx, c.client, info.Endpoint)
 
-       return &rows, nil
+       <-rows.initializedChan
 
+       return rows, nil
 }
 
-func readEndpoint(ctx context.Context, client *flightsql.Client, endpoint 
*flight.FlightEndpoint) (*arrow.Schema, []arrow.Record, error) {
-       reader, err := client.DoGet(ctx, endpoint.GetTicket())
-       if err != nil {
-               return nil, nil, fmt.Errorf("getting ticket failed: %w", err)
-       }
-       defer reader.Release()
+func (r *Rows) streamRecordset(ctx context.Context, c *flightsql.Client, 
endpoints []*flight.FlightEndpoint) {
+       defer close(r.recordChan)
 
-       schema := reader.Schema()
-       var records []arrow.Record
-       for reader.Next() {
-               if record := reader.Record(); record.NumRows() > 0 {
-                       record.Retain()
-                       records = append(records, record)
-               }
-       }
+       initializeOnceOnly := &sync.Once{}
 
-       if err := reader.Err(); err != nil && !errors.Is(err, io.EOF) {
-               return nil, nil, err
-       }
+       defer func() { // in case of error, init anyway
+               initializeOnceOnly.Do(func() { r.initializedChan <- true })
+       }()
+
+       // reads each endpoint
+       for _, endpoint := range endpoints {
+               func() { // with a func() is possible to {defer 
reader.Release()}
+                       reader, err := c.DoGet(ctx, endpoint.GetTicket())
+                       if err != nil {
+                               r.streamError = fmt.Errorf("getting ticket 
failed: %w", err)
+                               return
+                       }
+
+                       defer reader.Release()
+
+                       r.schema = reader.Schema()
+
+                       // reads each record into a blocking channel
+                       for reader.Next() {
+                               record := reader.Record()
+                               record.Retain()
+
+                               if record.NumRows() < 1 {
+                                       record.Release()
+                                       continue
+                               }
 
-       return schema, records, nil
+                               select {
+                               case r.recordChan <- record:
+                                       go initializeOnceOnly.Do(func() { 
r.initializedChan <- true })
+
+                               case <-ctx.Done():
+                                       r.releaseRecord()
+                                       r.streamError = fmt.Errorf("stream 
recordset context timed out")

Review Comment:
   @zeroshade During my tests, columns of the decimal type were `arrow.Decimal` 
instead of a float64. I used the drop in replacement to setup the connection to 
flightsql Dremio via `database/sql`.
   
   I will open an issue regarding the problems between Dremio and adbc in the 
adbc project. If I recall correctly, the problem was that the endpoint uri, 
which is used in the cache lru, was trying to fetch data from `0.0.0.0:32010` 
and not the url behind the ingress.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to