This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 0b6596c refactor: improve manifest scanning organization and
concurrency (#252)
0b6596c is described below
commit 0b6596c8e5e092dcfe86f05b31641523416f42d4
Author: Kaushik Iska <[email protected]>
AuthorDate: Mon Jan 13 16:27:26 2025 -0600
refactor: improve manifest scanning organization and concurrency (#252)
This refactor breaks down the manifest scanning logic into more focused
components
in preparation for adding incremental scanners to allow reading
changelog and diffs between two snapshots.
Key changes include:
- Add manifestEntries type to safely collect data and delete entries
concurrently
- Split manifest handling into separate fetchPartitionSpecFilteredManifests
and
collectManifestEntries functions for better separation of concerns
- Replace manual goroutine management with errgroup for more robust
concurrency
- Add documentation comments explaining the manifest scanning process
This is a step toward adding a ManifestGroup abstraction similar to the Java
implementation that can be shared among different scanner types.
---
table/scanner.go | 177 ++++++++++++++++++++++++++++++-------------------------
1 file changed, 97 insertions(+), 80 deletions(-)
diff --git a/table/scanner.go b/table/scanner.go
index bfb183e..0b8c2f1 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/io"
+ "golang.org/x/sync/errgroup"
)
const ScanNoLimit = -1
@@ -82,6 +83,32 @@ func (p partitionRecord) Size() int { return
len(p) }
func (p partitionRecord) Get(pos int) any { return p[pos] }
func (p partitionRecord) Set(pos int, val any) { p[pos] = val }
+// manifestEntries holds the data and positional delete entries read from
manifests.
+type manifestEntries struct {
+ dataEntries []iceberg.ManifestEntry
+ positionalDeleteEntries []iceberg.ManifestEntry
+ mu sync.Mutex
+}
+
+func newManifestEntries() *manifestEntries {
+ return &manifestEntries{
+ dataEntries: make([]iceberg.ManifestEntry, 0),
+ positionalDeleteEntries: make([]iceberg.ManifestEntry, 0),
+ }
+}
+
+func (m *manifestEntries) addDataEntry(e iceberg.ManifestEntry) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.dataEntries = append(m.dataEntries, e)
+}
+
+func (m *manifestEntries) addPositionalDeleteEntry(e iceberg.ManifestEntry) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.positionalDeleteEntries = append(m.positionalDeleteEntries, e)
+}
+
func getPartitionRecord(dataFile iceberg.DataFile, partitionType
*iceberg.StructType) partitionRecord {
partitionData := dataFile.Partition()
@@ -259,129 +286,119 @@ func matchDeletesToData(entry iceberg.ManifestEntry,
positionalDeletes []iceberg
return out, nil
}
-func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
+// fetchPartitionSpecFilteredManifests retrieves the table's current snapshot,
+// fetches its manifest files, and applies partition-spec filters to remove
irrelevant manifests.
+func (scan *Scan) fetchPartitionSpecFilteredManifests()
([]iceberg.ManifestFile, error) {
snap := scan.Snapshot()
if snap == nil {
return nil, nil
}
- // step 1: filter manifests using partition summaries
- // the filter depends on the partition spec used to write the manifest
file
- // so create a cache of filters for each spec id
- manifestEvaluators :=
newKeyDefaultMapWrapErr(scan.buildManifestEvaluator)
+ // Fetch all manifests for the current snapshot.
manifestList, err := snap.Manifests(scan.io)
if err != nil {
return nil, err
}
- // remove any manifests that we don't need to use
+ // Build per-spec manifest evaluators and filter out irrelevant
manifests.
+ manifestEvaluators :=
newKeyDefaultMapWrapErr(scan.buildManifestEvaluator)
manifestList = slices.DeleteFunc(manifestList, func(mf
iceberg.ManifestFile) bool {
eval := manifestEvaluators.Get(int(mf.PartitionSpecID()))
use, err := eval(mf)
return !use || err != nil
})
- // step 2: filter the data files in each manifest
- // this filter depends on the partition spec used to write the manifest
file
- partitionEvaluators := newKeyDefaultMap(scan.buildPartitionEvaluator)
+ return manifestList, nil
+}
+
+// collectManifestEntries concurrently opens manifests, applies partition and
metrics
+// filters, and accumulates both data entries and positional-delete entries.
+func (scan *Scan) collectManifestEntries(
+ ctx context.Context,
+ manifestList []iceberg.ManifestFile,
+) (*manifestEntries, error) {
metricsEval, err := newInclusiveMetricsEvaluator(
- scan.metadata.CurrentSchema(), scan.rowFilter,
scan.caseSensitive, scan.options["include_empty_files"] == "true")
+ scan.metadata.CurrentSchema(),
+ scan.rowFilter,
+ scan.caseSensitive,
+ scan.options["include_empty_files"] == "true",
+ )
if err != nil {
return nil, err
}
minSeqNum := minSequenceNum(manifestList)
- dataEntries := make([]iceberg.ManifestEntry, 0)
- positionalDeleteEntries := make([]iceberg.ManifestEntry, 0)
-
- nworkers := min(scan.concurrency, len(manifestList))
- var wg sync.WaitGroup
-
- manifestChan := make(chan iceberg.ManifestFile, len(manifestList))
- entryChan := make(chan []iceberg.ManifestEntry, 20)
-
- ctx, cancel := context.WithCancelCause(ctx)
- for i := 0; i < nworkers; i++ {
- wg.Add(1)
-
- go func() {
- defer wg.Done()
-
- for {
- select {
- case m, ok := <-manifestChan:
- if !ok {
- return
- }
-
- if !scan.checkSequenceNumber(minSeqNum,
m) {
- continue
- }
-
- entries, err := openManifest(scan.io, m,
-
partitionEvaluators.Get(int(m.PartitionSpecID())), metricsEval)
- if err != nil {
- cancel(err)
- break
- }
-
- entryChan <- entries
- case <-ctx.Done():
- return
- }
- }
- }()
- }
+ concurrencyLimit := min(scan.concurrency, len(manifestList))
- go func() {
- wg.Wait()
- close(entryChan)
- }()
+ entries := newManifestEntries()
+ g, _ := errgroup.WithContext(ctx)
+ g.SetLimit(concurrencyLimit)
- for _, m := range manifestList {
- manifestChan <- m
- }
- close(manifestChan)
-
-Loop:
- for {
- select {
- case <-ctx.Done():
- return nil, context.Cause(ctx)
- case entries, ok := <-entryChan:
- if !ok {
- // closed!
- break Loop
+ partitionEvaluators := newKeyDefaultMap(scan.buildPartitionEvaluator)
+
+ for _, mf := range manifestList {
+ if !scan.checkSequenceNumber(minSeqNum, mf) {
+ continue
+ }
+
+ g.Go(func() error {
+ partEval :=
partitionEvaluators.Get(int(mf.PartitionSpecID()))
+ manifestEntries, err := openManifest(scan.io, mf,
partEval, metricsEval)
+ if err != nil {
+ return err
}
- for _, e := range entries {
+ for _, e := range manifestEntries {
df := e.DataFile()
switch df.ContentType() {
case iceberg.EntryContentData:
- dataEntries = append(dataEntries, e)
+ entries.addDataEntry(e)
case iceberg.EntryContentPosDeletes:
- positionalDeleteEntries =
append(positionalDeleteEntries, e)
+ entries.addPositionalDeleteEntry(e)
case iceberg.EntryContentEqDeletes:
- return nil, fmt.Errorf("iceberg-go does
not yet support equality deletes")
+ return fmt.Errorf("iceberg-go does not
yet support equality deletes")
default:
- return nil, fmt.Errorf("%w: unknown
DataFileContent type (%s): %s",
+ return fmt.Errorf("%w: unknown
DataFileContent type (%s): %s",
ErrInvalidMetadata,
df.ContentType(), e)
}
}
- }
+ return nil
+ })
+ }
+
+ if err := g.Wait(); err != nil {
+ return nil, err
+ }
+
+ return entries, nil
+}
+
+// PlanFiles orchestrates the fetching and filtering of manifests, and then
+// building a list of FileScanTasks that match the current Scan criteria.
+func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
+ // Step 1: Retrieve filtered manifests based on snapshot and partition
specs.
+ manifestList, err := scan.fetchPartitionSpecFilteredManifests()
+ if err != nil || len(manifestList) == 0 {
+ return nil, err
}
- slices.SortFunc(positionalDeleteEntries, func(a, b
iceberg.ManifestEntry) int {
+ // Step 2: Read manifest entries concurrently, accumulating data and
positional deletes.
+ entries, err := scan.collectManifestEntries(ctx, manifestList)
+ if err != nil {
+ return nil, err
+ }
+
+ // Step 3: Sort positional deletes and match them to data files.
+ slices.SortFunc(entries.positionalDeleteEntries, func(a, b
iceberg.ManifestEntry) int {
return cmp.Compare(a.SequenceNum(), b.SequenceNum())
})
- results := make([]FileScanTask, 0)
- for _, e := range dataEntries {
- deleteFiles, err := matchDeletesToData(e,
positionalDeleteEntries)
+ results := make([]FileScanTask, 0, len(entries.dataEntries))
+ for _, e := range entries.dataEntries {
+ deleteFiles, err := matchDeletesToData(e,
entries.positionalDeleteEntries)
if err != nil {
return nil, err
}
-
results = append(results, FileScanTask{
File: e.DataFile(),
DeleteFiles: deleteFiles,