This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 3c01a046 feat(table): add Scan.ReadTasks for reading pre-planned file
scan tasks (#781)
3c01a046 is described below
commit 3c01a04602af53cb77e18c479aa3adfb654f9209
Author: Tobias Pütz <[email protected]>
AuthorDate: Tue Mar 17 17:56:56 2026 +0100
feat(table): add Scan.ReadTasks for reading pre-planned file scan tasks
(#781)
This extracts the read logic from ToArrowRecords as a shared readTasks
method and exposes it as ReadTasks. This enables callers that already
have FileScanTasks to read them without re-planning.
Currently ToArrowRecords always goes through PlanFiles first. If you've
already selected or filtered your tasks (e.g. for compaction, selective
re-reads, or custom merge strategies), there's no way of using them
directly.
---
table/scanner.go | 13 ++++++++-
table/scanner_test.go | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 92 insertions(+), 1 deletion(-)
diff --git a/table/scanner.go b/table/scanner.go
index bc3c0902..0d1a0dbd 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -505,7 +505,18 @@ func (scan *Scan) ToArrowRecords(ctx context.Context)
(*arrow.Schema, iter.Seq2[
return nil, nil, err
}
- var boundFilter iceberg.BooleanExpression
+ return scan.ReadTasks(ctx, tasks)
+}
+
+// ReadTasks reads Arrow records from a specific set of FileScanTasks,
applying the
+// scan's projection, row filters, and positional delete handling. This is
useful when
+// the caller has already planned or selected specific tasks to read.
+func (scan *Scan) ReadTasks(ctx context.Context, tasks []FileScanTask)
(*arrow.Schema, iter.Seq2[arrow.RecordBatch, error], error) {
+ var (
+ boundFilter iceberg.BooleanExpression
+ err error
+ )
+
if scan.rowFilter != nil {
boundFilter, err =
iceberg.BindExpr(scan.metadata.CurrentSchema(), scan.rowFilter,
scan.caseSensitive)
if err != nil {
diff --git a/table/scanner_test.go b/table/scanner_test.go
index 6554c134..11c8e205 100644
--- a/table/scanner_test.go
+++ b/table/scanner_test.go
@@ -319,6 +319,86 @@ func (s *ScannerSuite) TestScannerRecordsDeletes() {
}
}
+func (s *ScannerSuite) TestReadTasks() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(s.T(), 0)
+
+ ident := catalog.ToIdentifier("default", "test_positional_mor_deletes")
+
+ tbl, err := s.cat.LoadTable(s.ctx, ident)
+ s.Require().NoError(err)
+
+ expectedSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "number", Type: arrow.PrimitiveTypes.Int32, Nullable:
true},
+ }, nil)
+
+ ref := iceberg.Reference("letter")
+
+ tests := []struct {
+ name string
+ filter iceberg.BooleanExpression
+ rowLimit int64
+ expected string
+ }{
+ {
+ "all",
+ iceberg.AlwaysTrue{},
+ table.ScanNoLimit,
+ `[1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]`,
+ },
+ {"filter", iceberg.NewAnd(iceberg.GreaterThanEqual(ref, "e"),
+ iceberg.LessThan(ref, "k")), table.ScanNoLimit, `[5, 6,
7, 8, 10]`},
+ {"filter and limit",
iceberg.NewAnd(iceberg.GreaterThanEqual(ref, "e"),
+ iceberg.LessThan(ref, "k")), 1, `[5]`},
+ {"limit", nil, 3, `[1, 2, 3]`},
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.name, func() {
+ scopedMem := memory.NewCheckedAllocatorScope(mem)
+ defer scopedMem.CheckSize(s.T())
+
+ ctx := compute.WithAllocator(s.ctx, mem)
+
+ scan := tbl.Scan(table.WithRowFilter(tt.filter),
+ table.WithSelectedFields("number"))
+ tasks, err := scan.PlanFiles(ctx)
+ s.Require().NoError(err)
+
+ s.Len(tasks, 1)
+ s.Len(tasks[0].DeleteFiles, 1)
+
+ _, itr, err :=
scan.UseRowLimit(tt.rowLimit).ReadTasks(ctx, tasks)
+ s.Require().NoError(err)
+
+ next, stop := iter.Pull2(itr)
+ defer stop()
+
+ rec, err, valid := next()
+ s.Require().True(valid)
+ s.Require().NoError(err)
+ defer rec.Release()
+
+ s.True(expectedSchema.Equal(rec.Schema()), "expected:
%s\ngot: %s\n",
+ expectedSchema, rec.Schema())
+
+ arr, _, err := array.FromJSON(mem,
arrow.PrimitiveTypes.Int32,
+ strings.NewReader(tt.expected))
+ s.Require().NoError(err)
+ defer arr.Release()
+
+ expectedResult := array.NewRecord(expectedSchema,
[]arrow.Array{arr}, int64(arr.Len()))
+ defer expectedResult.Release()
+
+ s.True(array.RecordEqual(expectedResult, rec),
"expected: %s\ngot: %s\n", expectedResult, rec)
+
+ _, err, valid = next()
+ s.Require().NoError(err)
+ s.Require().False(valid)
+ })
+ }
+}
+
func (s *ScannerSuite) TestScannerRecordsDoubleDeletes() {
// number, letter
// (1, 'a'),