alexandre-normand commented on code in PR #721:
URL: https://github.com/apache/iceberg-go/pull/721#discussion_r2850389817


##########
table/snapshot_producers.go:
##########
@@ -628,11 +603,55 @@ func (sp *snapshotProducer) manifests() (_ 
[]iceberg.ManifestFile, err error) {
                return nil, err
        }
 
-       manifests := slices.Concat(results[0], results[1], results[2])
+       manifests := slices.Concat(addedManifests, positionDeleteManifests, 
deletedFilesManifests, existingManifests)
 
        return sp.processManifests(manifests)
 }
 
+func (sp *snapshotProducer) manifestProducer(content iceberg.ManifestContent, 
files []iceberg.DataFile, output *[]iceberg.ManifestFile) func() (err error) {
+       return func() (err error) {
+               out, path, err := sp.newManifestOutput()
+               if err != nil {
+                       return err
+               }
+               defer internal.CheckedClose(out, &err)
+
+               counter := &internal.CountingWriter{W: out}
+               currentSpec, err := sp.txn.meta.CurrentSpec()
+               if err != nil || currentSpec == nil {
+                       return fmt.Errorf("could not get current partition 
spec: %w", err)
+               }
+               wr, err := iceberg.NewManifestWriter(sp.txn.meta.formatVersion, 
counter,
+                       *currentSpec, sp.txn.meta.CurrentSchema(),
+                       sp.snapshotID)
+               if err != nil {
+                       return err
+               }
+               defer internal.CheckedClose(wr, &err)
+
+               for _, df := range files {
+                       err := 
wr.Add(iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID,
+                               nil, nil, df))
+                       if err != nil {
+                               return err
+                       }
+               }
+
+               // close the writer to force a flush and ensure counter.Count 
is accurate
+               if err := wr.Close(); err != nil {
+                       return err
+               }
+
+               mf, err := wr.ToManifestFile(path, counter.Count, 
iceberg.WithManifestFileContent(content))
+               if err != nil {
+                       return err
+               }
+               *output = []iceberg.ManifestFile{mf}
+
+               return nil
+       }
+}
+
 func (sp *snapshotProducer) summary(props iceberg.Properties) (Summary, error) 
{

Review Comment:
   That's a good catch and easy enough to fix on its own but, adding a test 
case for that to validate the snapshot statistics, I encountered a bug where 
scanning tries to parse of of the positional delete files as a data file which 
causes an error. I have the summary showing the positional delete count but 
that same test does a scan to check the number of rows post-delete and that 
fails so I have to address that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to