zeroshade commented on code in PR #13120:
URL: https://github.com/apache/arrow/pull/13120#discussion_r873052453


##########
go/parquet/pqarrow/column_readers.go:
##########
@@ -218,32 +222,38 @@ func (sr *structReader) GetRepLevels() ([]int16, error) {
 
 func (sr *structReader) LoadBatch(nrecords int64) error {
        var (
-               // REP -- Load batches in parallel
+               // Load batches in parallel
                // When reading structs with large numbers of columns, the 
serial load is very slow.
                // This is especially true when reading Cloud Storage. Loading 
concurrently
                // greatly improves performance.
-               wg      sync.WaitGroup
-               errchan chan error = make(chan error)
-               err     error
+               wg   sync.WaitGroup
+               np   int = 1 // default to serial
+               sem  chan interface{}
+               errs ErrBuffer
+               err  error
        )
 
-       //* Read First error from errchan and break only capturing first error
-       go func() {
-               for err = range errchan {
-                       break
-               }
-       }()
+       if sr.props.Parallel {
+               np = len(sr.children)
+       }
+       sem = make(chan interface{}, np)
        wg.Add(len(sr.children))
        for _, rdr := range sr.children {
+               sem <- nil // Acquire
                go func(r *ColumnReader) {
                        defer wg.Done()
+                       defer func() { <-sem }() // release
                        if err := r.LoadBatch(nrecords); err != nil {
-                               errchan <- err
+                               errs.Append(err)
                        }
                }(rdr)
        }
        wg.Wait() // wait for reads to complete
-       close(errchan)

Review Comment:
   Interesting approach using a channel as a semaphore. 
   
   I think we should try using `golang.org/x/sync/errgroup`. Using that would 
simplify the code quite a bit:
   
   ```go
   g := new(errgroup.Group)
   g.SetLimit(np)
   for _, rdr := range sr.children {
           rdr := rdr
           g.Go(func() error { return rdr.LoadBatch(nrecords) })
   }
   
   return g.Wait()
   ```
   
   I believe this would work correctly. Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to