This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b6596c  refactor: improve manifest scanning organization and 
concurrency (#252)
0b6596c is described below

commit 0b6596c8e5e092dcfe86f05b31641523416f42d4
Author: Kaushik Iska <[email protected]>
AuthorDate: Mon Jan 13 16:27:26 2025 -0600

    refactor: improve manifest scanning organization and concurrency (#252)
    
    This refactor breaks down the manifest scanning logic into more focused 
components
    in preparation for adding incremental scanners to allow reading
    changelog and diffs between two snapshots.
    
    Key changes include:
    
    - Add manifestEntries type to safely collect data and delete entries 
concurrently
    - Split manifest handling into separate fetchPartitionSpecFilteredManifests 
and
      collectManifestEntries functions for better separation of concerns
    - Replace manual goroutine management with errgroup for more robust 
concurrency
    - Add documentation comments explaining the manifest scanning process
    
    This is a step toward adding a ManifestGroup abstraction similar to the Java
    implementation that can be shared among different scanner types.
---
 table/scanner.go | 177 ++++++++++++++++++++++++++++++-------------------------
 1 file changed, 97 insertions(+), 80 deletions(-)

diff --git a/table/scanner.go b/table/scanner.go
index bfb183e..0b8c2f1 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -29,6 +29,7 @@ import (
        "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/io"
+       "golang.org/x/sync/errgroup"
 )
 
 const ScanNoLimit = -1
@@ -82,6 +83,32 @@ func (p partitionRecord) Size() int            { return 
len(p) }
 func (p partitionRecord) Get(pos int) any      { return p[pos] }
 func (p partitionRecord) Set(pos int, val any) { p[pos] = val }
 
+// manifestEntries holds the data and positional delete entries read from 
manifests.
+type manifestEntries struct {
+       dataEntries             []iceberg.ManifestEntry
+       positionalDeleteEntries []iceberg.ManifestEntry
+       mu                      sync.Mutex
+}
+
+func newManifestEntries() *manifestEntries {
+       return &manifestEntries{
+               dataEntries:             make([]iceberg.ManifestEntry, 0),
+               positionalDeleteEntries: make([]iceberg.ManifestEntry, 0),
+       }
+}
+
+func (m *manifestEntries) addDataEntry(e iceberg.ManifestEntry) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       m.dataEntries = append(m.dataEntries, e)
+}
+
+func (m *manifestEntries) addPositionalDeleteEntry(e iceberg.ManifestEntry) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+       m.positionalDeleteEntries = append(m.positionalDeleteEntries, e)
+}
+
 func getPartitionRecord(dataFile iceberg.DataFile, partitionType 
*iceberg.StructType) partitionRecord {
        partitionData := dataFile.Partition()
 
@@ -259,129 +286,119 @@ func matchDeletesToData(entry iceberg.ManifestEntry, 
positionalDeletes []iceberg
        return out, nil
 }
 
-func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
+// fetchPartitionSpecFilteredManifests retrieves the table's current snapshot,
+// fetches its manifest files, and applies partition-spec filters to remove 
irrelevant manifests.
+func (scan *Scan) fetchPartitionSpecFilteredManifests() 
([]iceberg.ManifestFile, error) {
        snap := scan.Snapshot()
        if snap == nil {
                return nil, nil
        }
 
-       // step 1: filter manifests using partition summaries
-       // the filter depends on the partition spec used to write the manifest 
file
-       // so create a cache of filters for each spec id
-       manifestEvaluators := 
newKeyDefaultMapWrapErr(scan.buildManifestEvaluator)
+       // Fetch all manifests for the current snapshot.
        manifestList, err := snap.Manifests(scan.io)
        if err != nil {
                return nil, err
        }
 
-       // remove any manifests that we don't need to use
+       // Build per-spec manifest evaluators and filter out irrelevant 
manifests.
+       manifestEvaluators := 
newKeyDefaultMapWrapErr(scan.buildManifestEvaluator)
        manifestList = slices.DeleteFunc(manifestList, func(mf 
iceberg.ManifestFile) bool {
                eval := manifestEvaluators.Get(int(mf.PartitionSpecID()))
                use, err := eval(mf)
                return !use || err != nil
        })
 
-       // step 2: filter the data files in each manifest
-       // this filter depends on the partition spec used to write the manifest 
file
-       partitionEvaluators := newKeyDefaultMap(scan.buildPartitionEvaluator)
+       return manifestList, nil
+}
+
+// collectManifestEntries concurrently opens manifests, applies partition and 
metrics
+// filters, and accumulates both data entries and positional-delete entries.
+func (scan *Scan) collectManifestEntries(
+       ctx context.Context,
+       manifestList []iceberg.ManifestFile,
+) (*manifestEntries, error) {
        metricsEval, err := newInclusiveMetricsEvaluator(
-               scan.metadata.CurrentSchema(), scan.rowFilter, 
scan.caseSensitive, scan.options["include_empty_files"] == "true")
+               scan.metadata.CurrentSchema(),
+               scan.rowFilter,
+               scan.caseSensitive,
+               scan.options["include_empty_files"] == "true",
+       )
        if err != nil {
                return nil, err
        }
 
        minSeqNum := minSequenceNum(manifestList)
-       dataEntries := make([]iceberg.ManifestEntry, 0)
-       positionalDeleteEntries := make([]iceberg.ManifestEntry, 0)
-
-       nworkers := min(scan.concurrency, len(manifestList))
-       var wg sync.WaitGroup
-
-       manifestChan := make(chan iceberg.ManifestFile, len(manifestList))
-       entryChan := make(chan []iceberg.ManifestEntry, 20)
-
-       ctx, cancel := context.WithCancelCause(ctx)
-       for i := 0; i < nworkers; i++ {
-               wg.Add(1)
-
-               go func() {
-                       defer wg.Done()
-
-                       for {
-                               select {
-                               case m, ok := <-manifestChan:
-                                       if !ok {
-                                               return
-                                       }
-
-                                       if !scan.checkSequenceNumber(minSeqNum, 
m) {
-                                               continue
-                                       }
-
-                                       entries, err := openManifest(scan.io, m,
-                                               
partitionEvaluators.Get(int(m.PartitionSpecID())), metricsEval)
-                                       if err != nil {
-                                               cancel(err)
-                                               break
-                                       }
-
-                                       entryChan <- entries
-                               case <-ctx.Done():
-                                       return
-                               }
-                       }
-               }()
-       }
+       concurrencyLimit := min(scan.concurrency, len(manifestList))
 
-       go func() {
-               wg.Wait()
-               close(entryChan)
-       }()
+       entries := newManifestEntries()
+       g, _ := errgroup.WithContext(ctx)
+       g.SetLimit(concurrencyLimit)
 
-       for _, m := range manifestList {
-               manifestChan <- m
-       }
-       close(manifestChan)
-
-Loop:
-       for {
-               select {
-               case <-ctx.Done():
-                       return nil, context.Cause(ctx)
-               case entries, ok := <-entryChan:
-                       if !ok {
-                               // closed!
-                               break Loop
+       partitionEvaluators := newKeyDefaultMap(scan.buildPartitionEvaluator)
+
+       for _, mf := range manifestList {
+               if !scan.checkSequenceNumber(minSeqNum, mf) {
+                       continue
+               }
+
+               g.Go(func() error {
+                       partEval := 
partitionEvaluators.Get(int(mf.PartitionSpecID()))
+                       manifestEntries, err := openManifest(scan.io, mf, 
partEval, metricsEval)
+                       if err != nil {
+                               return err
                        }
 
-                       for _, e := range entries {
+                       for _, e := range manifestEntries {
                                df := e.DataFile()
                                switch df.ContentType() {
                                case iceberg.EntryContentData:
-                                       dataEntries = append(dataEntries, e)
+                                       entries.addDataEntry(e)
                                case iceberg.EntryContentPosDeletes:
-                                       positionalDeleteEntries = 
append(positionalDeleteEntries, e)
+                                       entries.addPositionalDeleteEntry(e)
                                case iceberg.EntryContentEqDeletes:
-                                       return nil, fmt.Errorf("iceberg-go does 
not yet support equality deletes")
+                                       return fmt.Errorf("iceberg-go does not 
yet support equality deletes")
                                default:
-                                       return nil, fmt.Errorf("%w: unknown 
DataFileContent type (%s): %s",
+                                       return fmt.Errorf("%w: unknown 
DataFileContent type (%s): %s",
                                                ErrInvalidMetadata, 
df.ContentType(), e)
                                }
                        }
-               }
+                       return nil
+               })
+       }
+
+       if err := g.Wait(); err != nil {
+               return nil, err
+       }
+
+       return entries, nil
+}
+
+// PlanFiles orchestrates the fetching and filtering of manifests, and then
+// building a list of FileScanTasks that match the current Scan criteria.
+func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
+       // Step 1: Retrieve filtered manifests based on snapshot and partition 
specs.
+       manifestList, err := scan.fetchPartitionSpecFilteredManifests()
+       if err != nil || len(manifestList) == 0 {
+               return nil, err
        }
 
-       slices.SortFunc(positionalDeleteEntries, func(a, b 
iceberg.ManifestEntry) int {
+       // Step 2: Read manifest entries concurrently, accumulating data and 
positional deletes.
+       entries, err := scan.collectManifestEntries(ctx, manifestList)
+       if err != nil {
+               return nil, err
+       }
+
+       // Step 3: Sort positional deletes and match them to data files.
+       slices.SortFunc(entries.positionalDeleteEntries, func(a, b 
iceberg.ManifestEntry) int {
                return cmp.Compare(a.SequenceNum(), b.SequenceNum())
        })
 
-       results := make([]FileScanTask, 0)
-       for _, e := range dataEntries {
-               deleteFiles, err := matchDeletesToData(e, 
positionalDeleteEntries)
+       results := make([]FileScanTask, 0, len(entries.dataEntries))
+       for _, e := range entries.dataEntries {
+               deleteFiles, err := matchDeletesToData(e, 
entries.positionalDeleteEntries)
                if err != nil {
                        return nil, err
                }
-
                results = append(results, FileScanTask{
                        File:        e.DataFile(),
                        DeleteFiles: deleteFiles,

Reply via email to