zeroshade commented on code in PR #799:
URL: https://github.com/apache/iceberg-go/pull/799#discussion_r2977572922


##########
table/arrow_utils.go:
##########
@@ -1343,72 +1344,84 @@ func computeStatsPlan(sc *iceberg.Schema, props 
iceberg.Properties) (map[int]tbl
        return result, nil
 }
 
-func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta 
*MetadataBuilder, paths iter.Seq[string]) iter.Seq2[iceberg.DataFile, error] {
-       return func(yield func(iceberg.DataFile, error) bool) {
-               defer func() {
-                       if r := recover(); r != nil {
-                               switch e := r.(type) {
-                               case string:
-                                       yield(nil, fmt.Errorf("error 
encountered during file conversion: %s", e))
-                               case error:
-                                       yield(nil, fmt.Errorf("error 
encountered during file conversion: %w", e))
-                               }
-                       }
-               }()
-
-               partitionSpec, err := meta.CurrentSpec()
-               if err != nil || partitionSpec == nil {
-                       yield(nil, fmt.Errorf("%w: cannot add files without a 
current spec", err))
-
-                       return
-               }
+func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta 
*MetadataBuilder, filePaths []string, concurrency int) (_ []iceberg.DataFile, 
err error) {

Review Comment:
   you've converted this from an iterator into something that returns the full 
slice. Could we instead use a channel for the goroutines to send on, and then 
return an iterator which reads from the channel to still allow the caller to 
utilize the iterator? 
   
   That said, I wonder if it's worth the complexity to maintain this as an 
iterator. I don't think we do anything right now on error with the files that 
were successful and instead just error out completely. So I'm not sure it's 
actually worth the extra work, thoughts?



##########
table/arrow_utils.go:
##########
@@ -1343,72 +1344,84 @@ func computeStatsPlan(sc *iceberg.Schema, props 
iceberg.Properties) (map[int]tbl
        return result, nil
 }
 
-func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta 
*MetadataBuilder, paths iter.Seq[string]) iter.Seq2[iceberg.DataFile, error] {
-       return func(yield func(iceberg.DataFile, error) bool) {
-               defer func() {
-                       if r := recover(); r != nil {
-                               switch e := r.(type) {
-                               case string:
-                                       yield(nil, fmt.Errorf("error 
encountered during file conversion: %s", e))
-                               case error:
-                                       yield(nil, fmt.Errorf("error 
encountered during file conversion: %w", e))
-                               }
-                       }
-               }()
-
-               partitionSpec, err := meta.CurrentSpec()
-               if err != nil || partitionSpec == nil {
-                       yield(nil, fmt.Errorf("%w: cannot add files without a 
current spec", err))
-
-                       return
-               }
+func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta 
*MetadataBuilder, filePaths []string, concurrency int) (_ []iceberg.DataFile, 
err error) {
+       partitionSpec, err := meta.CurrentSpec()
+       if err != nil || partitionSpec == nil {
+               return nil, fmt.Errorf("%w: cannot add files without a current 
spec", err)
+       }
 
-               currentSchema, currentSpec := meta.CurrentSchema(), 
*partitionSpec
+       currentSchema, currentSpec := meta.CurrentSchema(), *partitionSpec
 
-               for filePath := range paths {
-                       format := tblutils.FormatFromFileName(filePath)
-                       rdr := must(format.Open(ctx, fileIO, filePath))
-                       // TODO: take a look at this defer Close() and consider 
refactoring
-                       defer rdr.Close()
+       dataFiles := make([]iceberg.DataFile, len(filePaths))
+       eg, ctx := errgroup.WithContext(ctx)
+       eg.SetLimit(concurrency)
+       for i, filePath := range filePaths {
+               eg.Go(func() (err error) {
+                       defer func() {
+                               if r := recover(); r != nil {
+                                       switch e := r.(type) {
+                                       case string:
+                                               err = fmt.Errorf("error 
encountered during file conversion: %s", e)
+                                       case error:
+                                               err = fmt.Errorf("error 
encountered during file conversion: %w", e)
+                                       }
+                               }
+                       }()
 
-                       arrSchema := must(rdr.Schema())
+                       dataFile, err := fileToDataFile(ctx, fileIO, filePath, 
currentSchema, currentSpec, meta.props)
+                       if err != nil {
+                               return err
+                       }
+                       dataFiles[i] = dataFile
 
-                       if err := checkArrowSchemaCompat(currentSchema, 
arrSchema, false); err != nil {
-                               yield(nil, err)
+                       return nil
+               })
+       }
 
-                               return
-                       }
+       if err := eg.Wait(); err != nil {
+               return nil, err
+       }
 
-                       pathToIDSchema := currentSchema
-                       if fileHasIDs := must(VisitArrowSchema(arrSchema, 
hasIDs{})); fileHasIDs {
-                               pathToIDSchema = 
must(ArrowSchemaToIceberg(arrSchema, false, nil))
-                       }
+       return dataFiles, nil
+}
 
-                       statistics := 
format.DataFileStatsFromMeta(rdr.Metadata(), 
must(computeStatsPlan(currentSchema, meta.props)),
-                               must(format.PathToIDMapping(pathToIDSchema)))
+func fileToDataFile(ctx context.Context, fileIO iceio.IO, filePath string, 
currentSchema *iceberg.Schema, currentSpec iceberg.PartitionSpec, props 
iceberg.Properties) (iceberg.DataFile, error) {
+       format := tblutils.FormatFromFileName(filePath)
+       rdr := must(format.Open(ctx, fileIO, filePath))

Review Comment:
   if we're relying on the recover above, and using `must` here, why both 
having the `error` in the return signature at all? Let's commit to one route or 
the other: either rely on the recover and use `must` everywhere internally 
(remove the `error` from the signature), or don't panic and rely on the `error` 
in the signature and check the error on return.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to