alexandre-normand commented on code in PR #721:
URL: https://github.com/apache/iceberg-go/pull/721#discussion_r2850530628
##########
table/arrow_utils.go:
##########
@@ -1358,12 +1370,87 @@ func recordsToDataFiles(ctx context.Context,
rootLocation string, meta *Metadata
}
}
- return writeFiles(ctx, rootLocation, args.fs, meta, nil, tasks)
- } else {
- rollingDataWriters := NewWriterFactory(rootLocation, args,
meta, taskSchema, targetFileSize)
- partitionWriter := newPartitionedFanoutWriter(*currentSpec,
meta.CurrentSchema(), args.itr, &rollingDataWriters)
- workers := config.EnvConfig.MaxWorkers
+ return cw.writeFiles(ctx, rootLocation, args.fs, meta,
meta.props, nil, tasks)
+ }
+
+ factory := NewWriterFactory(rootLocation, args, meta, taskSchema,
targetFileSize)
+ partitionWriter := newPartitionedFanoutWriter(*currentSpec, cw,
meta.CurrentSchema(), args.itr, &factory)
+ workers := config.EnvConfig.MaxWorkers
- return partitionWriter.Write(ctx, workers)
+ return partitionWriter.Write(ctx, workers)
+}
+
+func positionDeleteRecordsToDataFiles(ctx context.Context, rootLocation
string, meta *MetadataBuilder, partitionDataPerFile map[string]map[int]any,
args recordWritingArgs) (ret iter.Seq2[iceberg.DataFile, error]) {
+ if args.counter == nil {
+ args.counter = internal.Counter(0)
}
+
+ defer func() {
+ if r := recover(); r != nil {
+ var err error
+ switch e := r.(type) {
+ case error:
+ err = fmt.Errorf("error encountered during
position delete file writing: %w", e)
+ default:
+ err = fmt.Errorf("error encountered during
position delete file writing: %v", e)
+ }
+ ret = func(yield func(iceberg.DataFile, error) bool) {
+ yield(nil, err)
+ }
+ }
+ }()
+
+ if args.writeUUID == nil {
+ u := uuid.Must(uuid.NewRandom())
+ args.writeUUID = &u
+ }
+
+ targetFileSize := int64(meta.props.GetInt(WriteTargetFileSizeBytesKey,
+ WriteTargetFileSizeBytesDefault))
+
+ currentSpec, err := meta.CurrentSpec()
Review Comment:
I tried to address this but let me know if that's correct. I extended the
mapping per data file to include the spec id in addition to the partition data
we were already tracking that way. That's passed down now and the position
delete fanout writer uses the partition spec from the partition context.
That's done in cc0a966778dc56f5f8383f7f993d438c6750852f. The one case I was
unsure about is the one where the table's latest metadata is unpartitioned. In
that case, we were short-circuiting early and skipping the fanout writer. That
case remains but I'm not sure what the ramifications are.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]