starpact commented on code in PR #799:
URL: https://github.com/apache/iceberg-go/pull/799#discussion_r2982615325
##########
table/arrow_utils.go:
##########
@@ -1343,72 +1344,84 @@ func computeStatsPlan(sc *iceberg.Schema, props
iceberg.Properties) (map[int]tbl
return result, nil
}
-func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta
*MetadataBuilder, paths iter.Seq[string]) iter.Seq2[iceberg.DataFile, error] {
- return func(yield func(iceberg.DataFile, error) bool) {
- defer func() {
- if r := recover(); r != nil {
- switch e := r.(type) {
- case string:
- yield(nil, fmt.Errorf("error
encountered during file conversion: %s", e))
- case error:
- yield(nil, fmt.Errorf("error
encountered during file conversion: %w", e))
- }
- }
- }()
-
- partitionSpec, err := meta.CurrentSpec()
- if err != nil || partitionSpec == nil {
- yield(nil, fmt.Errorf("%w: cannot add files without a
current spec", err))
-
- return
- }
+func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta
*MetadataBuilder, filePaths []string, concurrency int) (_ []iceberg.DataFile,
err error) {
+ partitionSpec, err := meta.CurrentSpec()
+ if err != nil || partitionSpec == nil {
+ return nil, fmt.Errorf("%w: cannot add files without a current
spec", err)
+ }
- currentSchema, currentSpec := meta.CurrentSchema(),
*partitionSpec
+ currentSchema, currentSpec := meta.CurrentSchema(), *partitionSpec
- for filePath := range paths {
- format := tblutils.FormatFromFileName(filePath)
- rdr := must(format.Open(ctx, fileIO, filePath))
- // TODO: take a look at this defer Close() and consider
refactoring
- defer rdr.Close()
+ dataFiles := make([]iceberg.DataFile, len(filePaths))
+ eg, ctx := errgroup.WithContext(ctx)
+ eg.SetLimit(concurrency)
+ for i, filePath := range filePaths {
+ eg.Go(func() (err error) {
+ defer func() {
+ if r := recover(); r != nil {
+ switch e := r.(type) {
+ case string:
+ err = fmt.Errorf("error
encountered during file conversion: %s", e)
+ case error:
+ err = fmt.Errorf("error
encountered during file conversion: %w", e)
+ }
+ }
+ }()
- arrSchema := must(rdr.Schema())
+ dataFile, err := fileToDataFile(ctx, fileIO, filePath,
currentSchema, currentSpec, meta.props)
+ if err != nil {
+ return err
+ }
+ dataFiles[i] = dataFile
- if err := checkArrowSchemaCompat(currentSchema,
arrSchema, false); err != nil {
- yield(nil, err)
+ return nil
+ })
+ }
- return
- }
+ if err := eg.Wait(); err != nil {
+ return nil, err
+ }
- pathToIDSchema := currentSchema
- if fileHasIDs := must(VisitArrowSchema(arrSchema,
hasIDs{})); fileHasIDs {
- pathToIDSchema =
must(ArrowSchemaToIceberg(arrSchema, false, nil))
- }
+ return dataFiles, nil
+}
- statistics :=
format.DataFileStatsFromMeta(rdr.Metadata(),
must(computeStatsPlan(currentSchema, meta.props)),
- must(format.PathToIDMapping(pathToIDSchema)))
+func fileToDataFile(ctx context.Context, fileIO iceio.IO, filePath string,
currentSchema *iceberg.Schema, currentSpec iceberg.PartitionSpec, props
iceberg.Properties) (iceberg.DataFile, error) {
+ format := tblutils.FormatFromFileName(filePath)
+ rdr := must(format.Open(ctx, fileIO, filePath))
Review Comment:
Updated.
I want to use normal `error` handling but utils like `statistics.ToDataFile`
use `must` internally, so I think I'll follow that for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]