This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new ac5c84d  feat(table): Initial implementation of Reading Data (#185)
ac5c84d is described below

commit ac5c84d0a6e79ad66978e3e661eedb6f49edffda
Author: Matt Topol <[email protected]>
AuthorDate: Fri Nov 8 09:47:23 2024 +0100

    feat(table): Initial implementation of Reading Data (#185)
    
    * initial sketch
    
    * it works!
    
    * some refactoring
    
    * cleanup and comments
    
    * comments and docs
    
    * update readme
    
    * license fix
    
    * update arrow dep
    
    * fix propagation of limits from UseRef
    
    * spark sometimes writes out files differently
    
    * remove flakey test change
    
    * Update README.md
    
    Co-authored-by: Fokko Driesprong <[email protected]>
    
    * Update manifest.go
    
    Co-authored-by: Fokko Driesprong <[email protected]>
    
    * Update manifest.go
    
    Co-authored-by: Fokko Driesprong <[email protected]>
    
    * updates from feedback
    
    * skip non-position deletes
    
    ---------
    
    Co-authored-by: Fokko Driesprong <[email protected]>
---
 README.md                          |  10 +-
 go.mod                             |  14 +-
 go.sum                             |  51 +++-
 literals.go                        |   8 +-
 manifest.go                        |  84 +++---
 table/arrow_scanner.go             | 601 +++++++++++++++++++++++++++++++++++++
 table/arrow_utils.go               |  16 +-
 table/arrow_utils_test.go          |  14 +-
 table/evaluators.go                |  56 ++++
 table/internal/interfaces.go       |  68 +++++
 table/internal/parquet_files.go    | 420 ++++++++++++++++++++++++++
 table/internal/utils.go            |  76 +++++
 table/scanner.go                   | 172 +++++++----
 table/scanner_test.go              | 584 ++++++++++++++++++++++++++++++++---
 table/substrait/functions_set.yaml |  36 +++
 table/substrait/substrait.go       | 381 +++++++++++++++++++++++
 table/substrait/substrait_test.go  | 132 ++++++++
 table/table.go                     |  73 ++++-
 types.go                           |   8 +
 visitors.go                        |  53 ++++
 20 files changed, 2690 insertions(+), 167 deletions(-)

diff --git a/README.md b/README.md
index aeb249d..58dc718 100644
--- a/README.md
+++ b/README.md
@@ -55,8 +55,8 @@ $ cd iceberg-go/cmd/iceberg && go build .
 | Get Partition Specs      |     X     |
 | Get Manifests            |     X     |
 | Create New Manifests     |     X     |
-| Plan Scan                |           |
-| Plan Scan for Snapshot   |           |
+| Plan Scan                |     x     |
+| Plan Scan for Snapshot   |     x     |
 
 ### Catalog Support
 
@@ -77,9 +77,9 @@ $ cd iceberg-go/cmd/iceberg && go build .
 
 ### Read/Write Data Support
 
-* No intrinsic support for reading/writing data yet
-* Data can be manually read currently by retrieving data files via Manifests.
-* Plan to add [Apache 
Arrow](https://pkg.go.dev/github.com/apache/arrow/go/[email protected]) support 
eventually.
+* No intrinsic support for writing data yet.
+* Plan to add [Apache Arrow](https://pkg.go.dev/github.com/apache/arrow-go/) 
support eventually.
+* Data can currently be read as an Arrow Table or as a stream of Arrow record 
batches.
 
 # Get in Touch
 
diff --git a/go.mod b/go.mod
index 8d2f309..c41a0a7 100644
--- a/go.mod
+++ b/go.mod
@@ -22,7 +22,7 @@ go 1.23
 toolchain go1.23.2
 
 require (
-       github.com/apache/arrow-go/v18 v18.0.1-0.20241022184425-56b794f52a9b
+       github.com/apache/arrow-go/v18 v18.0.1-0.20241029153821-f0c5d9939d3f
        github.com/aws/aws-sdk-go-v2 v1.32.3
        github.com/aws/aws-sdk-go-v2/config v1.28.1
        github.com/aws/aws-sdk-go-v2/credentials v1.17.42
@@ -34,8 +34,10 @@ require (
        github.com/hamba/avro/v2 v2.27.0
        github.com/pterm/pterm v0.12.79
        github.com/stretchr/testify v1.9.0
+       github.com/substrait-io/substrait-go v1.1.0
        github.com/twmb/murmur3 v1.1.8
        github.com/wolfeidau/s3iofs v1.5.2
+       golang.org/x/sync v0.8.0
        gopkg.in/yaml.v3 v3.0.1
 )
 
@@ -44,6 +46,7 @@ require (
        atomicgo.dev/keyboard v0.2.9 // indirect
        atomicgo.dev/schedule v0.1.0 // indirect
        github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // 
indirect
+       github.com/alecthomas/participle/v2 v2.1.0 // indirect
        github.com/andybalholm/brotli v1.1.1 // indirect
        github.com/apache/thrift v0.21.0 // indirect
        github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect
@@ -60,8 +63,11 @@ require (
        github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.3 // indirect
        github.com/aws/aws-sdk-go-v2/service/sts v1.32.3 // indirect
        github.com/containerd/console v1.0.3 // indirect
+       github.com/creasty/defaults v1.8.0 // indirect
        github.com/davecgh/go-spew v1.1.1 // indirect
+       github.com/fatih/color v1.15.0 // indirect
        github.com/goccy/go-json v0.10.3 // indirect
+       github.com/goccy/go-yaml v1.11.0 // indirect
        github.com/golang/snappy v0.0.4 // indirect
        github.com/google/flatbuffers v24.3.25+incompatible // indirect
        github.com/gookit/color v1.5.4 // indirect
@@ -70,6 +76,8 @@ require (
        github.com/klauspost/compress v1.17.11 // indirect
        github.com/klauspost/cpuid/v2 v2.2.8 // indirect
        github.com/lithammer/fuzzysearch v1.1.8 // indirect
+       github.com/mattn/go-colorable v0.1.13 // indirect
+       github.com/mattn/go-isatty v0.0.19 // indirect
        github.com/mattn/go-runewidth v0.0.15 // indirect
        github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // 
indirect
        github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
@@ -85,10 +93,12 @@ require (
        golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
        golang.org/x/mod v0.21.0 // indirect
        golang.org/x/net v0.30.0 // indirect
-       golang.org/x/sync v0.8.0 // indirect
        golang.org/x/sys v0.26.0 // indirect
        golang.org/x/term v0.25.0 // indirect
        golang.org/x/text v0.19.0 // indirect
        golang.org/x/tools v0.26.0 // indirect
        golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
+       google.golang.org/genproto/googleapis/rpc 
v0.0.0-20240903143218-8af14fe29dc1 // indirect
+       google.golang.org/grpc v1.67.1 // indirect
+       google.golang.org/protobuf v1.35.1 // indirect
 )
diff --git a/go.sum b/go.sum
index e2aaddd..0ed534d 100644
--- a/go.sum
+++ b/go.sum
@@ -17,10 +17,18 @@ github.com/MarvinJWendt/testza v0.3.0/go.mod 
h1:eFcL4I0idjtIx8P9C6KkAuLgATNKpX4/
 github.com/MarvinJWendt/testza v0.4.2/go.mod 
h1:mSdhXiKH8sg/gQehJ63bINcCKp7RtYewEjXsvsVUPbE=
 github.com/MarvinJWendt/testza v0.5.2 
h1:53KDo64C1z/h/d/stCYCPY69bt/OSwjq5KpFNwi+zB4=
 github.com/MarvinJWendt/testza v0.5.2/go.mod 
h1:xu53QFE5sCdjtMCKk8YMQ2MnymimEctc4n3EjyIYvEY=
+github.com/alecthomas/assert/v2 v2.3.0 
h1:mAsH2wmvjsuvyBvAmCtm7zFsBlb8mIHx5ySLVdDZXL0=
+github.com/alecthomas/assert/v2 v2.3.0/go.mod 
h1:pXcQ2Asjp247dahGEmsZ6ru0UVwnkhktn7S0bBDLxvQ=
+github.com/alecthomas/participle/v2 v2.1.0 
h1:z7dElHRrOEEq45F2TG5cbQihMtNTv8vwldytDj7Wrz4=
+github.com/alecthomas/participle/v2 v2.1.0/go.mod 
h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c=
+github.com/alecthomas/repr v0.2.0 
h1:HAzS41CIzNW5syS8Mf9UwXhNH1J9aix/BvDRf1Ml2Yk=
+github.com/alecthomas/repr v0.2.0/go.mod 
h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
 github.com/andybalholm/brotli v1.1.1 
h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
 github.com/andybalholm/brotli v1.1.1/go.mod 
h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
 github.com/apache/arrow-go/v18 v18.0.1-0.20241022184425-56b794f52a9b 
h1:E79+ggEd/cv9g4iLe1B8XyBhpqlvqKbCGDmf+RPSwbA=
 github.com/apache/arrow-go/v18 v18.0.1-0.20241022184425-56b794f52a9b/go.mod 
h1:kVPeNv6eFSRhkfWZx1BIRXU6EZnp5g2NqKsuJmKXsO8=
+github.com/apache/arrow-go/v18 v18.0.1-0.20241029153821-f0c5d9939d3f 
h1:k14GhTGJuvq27vRgLxf4iuufzLt7GeN3UOytJmU7W/A=
+github.com/apache/arrow-go/v18 v18.0.1-0.20241029153821-f0c5d9939d3f/go.mod 
h1:kVPeNv6eFSRhkfWZx1BIRXU6EZnp5g2NqKsuJmKXsO8=
 github.com/apache/thrift v0.21.0 
h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE=
 github.com/apache/thrift v0.21.0/go.mod 
h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw=
 github.com/atomicgo/cursor v0.0.1/go.mod 
h1:cBON2QmmrysudxNBFthvMtN32r3jxVRIvzkUiF/RuIk=
@@ -64,17 +72,31 @@ github.com/aws/smithy-go v1.22.0 
h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM=
 github.com/aws/smithy-go v1.22.0/go.mod 
h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
 github.com/containerd/console v1.0.3 
h1:lIr7SlA5PxZyMV30bDW0MGbiOPXwc63yRuCP0ARubLw=
 github.com/containerd/console v1.0.3/go.mod 
h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U=
+github.com/creasty/defaults v1.8.0 
h1:z27FJxCAa0JKt3utc0sCImAEb+spPucmKoOdLHvHYKk=
+github.com/creasty/defaults v1.8.0/go.mod 
h1:iGzKe6pbEHnpMPtfDXZEr0NVxWnPTjb1bbDy08fPzYM=
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 
h1:bWDMxwH3px2JBh6AyO7hdCn/PkvCZXii8TGj7sbtEbQ=
 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod 
h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
+github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
+github.com/fatih/color v1.15.0/go.mod 
h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
+github.com/go-playground/locales v0.13.0 
h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
+github.com/go-playground/locales v0.13.0/go.mod 
h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
+github.com/go-playground/universal-translator v0.17.0 
h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
+github.com/go-playground/universal-translator v0.17.0/go.mod 
h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
+github.com/go-playground/validator/v10 v10.11.1 
h1:prmOlTVv+YjZjmRmNSF3VmspqJIxJWXmqUsHwfTRRkQ=
+github.com/go-playground/validator/v10 v10.11.1/go.mod 
h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU=
 github.com/goccy/go-json v0.10.3 
h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
 github.com/goccy/go-json v0.10.3/go.mod 
h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
+github.com/goccy/go-yaml v1.11.0 
h1:n7Z+zx8S9f9KgzG6KtQKf+kwqXZlLNR2F6018Dgau54=
+github.com/goccy/go-yaml v1.11.0/go.mod 
h1:H+mJrWtjPTJAHvRbV09MCK9xYwODM+wRTVFFTWckfng=
 github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
 github.com/golang/snappy v0.0.4/go.mod 
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/google/flatbuffers v24.3.25+incompatible 
h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI=
 github.com/google/flatbuffers v24.3.25+incompatible/go.mod 
h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
+github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
+github.com/google/go-cmp v0.6.0/go.mod 
h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
 github.com/google/gofuzz v1.0.0/go.mod 
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
 github.com/google/uuid v1.6.0/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -84,6 +106,8 @@ github.com/gookit/color v1.5.4 
h1:FZmqs7XOyGgCAxmWyPslpiok1k05wmY3SJTytgvYFs0=
 github.com/gookit/color v1.5.4/go.mod 
h1:pZJOeOS8DM43rXbp4AZo1n9zCU2qjpcRko0b6/QJi9w=
 github.com/hamba/avro/v2 v2.27.0 
h1:IAM4lQ0VzUIKBuo4qlAiLKfqALSrFC+zi1iseTtbBKU=
 github.com/hamba/avro/v2 v2.27.0/go.mod 
h1:jN209lopfllfrz7IGoZErlDz+AyUJ3vrBePQFZwYf5I=
+github.com/hexops/gotextdiff v1.0.3 
h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
+github.com/hexops/gotextdiff v1.0.3/go.mod 
h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
 github.com/json-iterator/go v1.1.12 
h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
 github.com/json-iterator/go v1.1.12/go.mod 
h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
 github.com/klauspost/asmfmt v1.3.2 
h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
@@ -95,14 +119,22 @@ github.com/klauspost/cpuid/v2 v2.0.10/go.mod 
h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuOb
 github.com/klauspost/cpuid/v2 v2.0.12/go.mod 
h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
 github.com/klauspost/cpuid/v2 v2.2.8 
h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
 github.com/klauspost/cpuid/v2 v2.2.8/go.mod 
h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
-github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
 github.com/kr/pretty v0.1.0/go.mod 
h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
+github.com/kr/pretty v0.3.1/go.mod 
h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0/go.mod 
h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
 github.com/kr/text v0.2.0/go.mod 
h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/leodido/go-urn v1.2.0 
h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
+github.com/leodido/go-urn v1.2.0/go.mod 
h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
 github.com/lithammer/fuzzysearch v1.1.8 
h1:/HIuJnjHuXS8bKaiTMeeDlW2/AyIWk2brx1V8LFgLN4=
 github.com/lithammer/fuzzysearch v1.1.8/go.mod 
h1:IdqeyBClc3FFqSzYq/MXESsS4S0FsZ5ajtkr5xPLts4=
+github.com/mattn/go-colorable v0.1.13 
h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
+github.com/mattn/go-colorable v0.1.13/go.mod 
h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
+github.com/mattn/go-isatty v0.0.16/go.mod 
h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
+github.com/mattn/go-isatty v0.0.19 
h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
+github.com/mattn/go-isatty v0.0.19/go.mod 
h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
 github.com/mattn/go-runewidth v0.0.13/go.mod 
h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
 github.com/mattn/go-runewidth v0.0.15 
h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
 github.com/mattn/go-runewidth v0.0.15/go.mod 
h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
@@ -133,6 +165,8 @@ github.com/pterm/pterm v0.12.79/go.mod 
h1:1v/gzOF1N0FsjbgTHZ1wVycRkKiatFvJSJC4IG
 github.com/rivo/uniseg v0.2.0/go.mod 
h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
 github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
 github.com/rivo/uniseg v0.4.4/go.mod 
h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
+github.com/rogpeppe/go-internal v1.9.0 
h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
+github.com/rogpeppe/go-internal v1.9.0/go.mod 
h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
 github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
 github.com/sergi/go-diff v1.2.0/go.mod 
h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
 github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -144,6 +178,8 @@ github.com/stretchr/testify v1.6.1/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
 github.com/stretchr/testify v1.7.0/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.9.0 
h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
 github.com/stretchr/testify v1.9.0/go.mod 
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/substrait-io/substrait-go v1.1.0 
h1:wUXoXV/ESMXgaOWu/05kvMI/UBUyhtaTRfkT5p1b5Ck=
+github.com/substrait-io/substrait-go v1.1.0/go.mod 
h1:LHzL5E0VL620yw4kBQCP+sQPmxhepPTQMDJQRbOe/T4=
 github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg=
 github.com/twmb/murmur3 v1.1.8/go.mod 
h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
 github.com/wolfeidau/s3iofs v1.5.2 
h1:2dmzSxdrSY29GsILVheJqBbURVQX3KglggSBtVWCYj4=
@@ -160,6 +196,8 @@ github.com/zeebo/xxh3 v1.0.2 
h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
 github.com/zeebo/xxh3 v1.0.2/go.mod 
h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod 
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod 
h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
+golang.org/x/crypto v0.28.0/go.mod 
h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
 golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 
h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk=
 golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod 
h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY=
 golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod 
h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
@@ -186,7 +224,9 @@ golang.org/x/sys v0.0.0-20211013075003-97ac67df715c/go.mod 
h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod 
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
 golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod 
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@@ -214,9 +254,16 @@ golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 
h1:+cNy6SZtPcJQH3LJVLOSm
 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod 
h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
 gonum.org/v1/gonum v0.15.1 h1:FNy7N6OUZVUaWG9pTiD+jlhdQ3lMP+/LcTpJ6+a8sQ0=
 gonum.org/v1/gonum v0.15.1/go.mod 
h1:eZTZuRFrzu5pcyjN5wJhcIhnUdNijYxX1T2IcrOGY0o=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 
h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
+google.golang.org/genproto/googleapis/rpc 
v0.0.0-20240903143218-8af14fe29dc1/go.mod 
h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
+google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
+google.golang.org/grpc v1.67.1/go.mod 
h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
+google.golang.org/protobuf v1.35.1 
h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
+google.golang.org/protobuf v1.35.1/go.mod 
h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 
h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c 
h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod 
h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/literals.go b/literals.go
index 2e16d02..be4bcd9 100644
--- a/literals.go
+++ b/literals.go
@@ -151,8 +151,12 @@ func LiteralFromBytes(typ Type, data []byte) (Literal, 
error) {
                return v, err
        case FixedType:
                if len(data) != t.Len() {
-                       return nil, fmt.Errorf("%w: expected length %d for type 
%s, got %d",
-                               ErrInvalidBinSerialization, t.Len(), t, 
len(data))
+                       // looks like some writers will write a prefix of the 
fixed length
+                       // for lower/upper bounds instead of the full length. 
so let's pad
+                       // it out to the full length if unpacking a fixed 
length literal
+                       padded := make([]byte, t.Len())
+                       copy(padded, data)
+                       data = padded
                }
                var v FixedLiteral
                err := v.UnmarshalBinary(data)
diff --git a/manifest.go b/manifest.go
index e14d56e..700702b 100644
--- a/manifest.go
+++ b/manifest.go
@@ -363,7 +363,7 @@ func (m *manifestFileV2) FetchEntries(fs iceio.IO, 
discardDeleted bool) ([]Manif
        return fetchManifestEntries(m, fs, discardDeleted)
 }
 
-func getFieldIDMap(sc avro.Schema) map[string]int {
+func getFieldIDMap(sc avro.Schema) (map[string]int, map[int]avro.LogicalType) {
        getField := func(rs *avro.RecordSchema, name string) *avro.Field {
                for _, f := range rs.Fields() {
                        if f.Name() == name {
@@ -374,19 +374,31 @@ func getFieldIDMap(sc avro.Schema) map[string]int {
        }
 
        result := make(map[string]int)
+       logicalTypes := make(map[int]avro.LogicalType)
        entryField := getField(sc.(*avro.RecordSchema), "data_file")
        partitionField := getField(entryField.Type().(*avro.RecordSchema), 
"partition")
 
        for _, field := range 
partitionField.Type().(*avro.RecordSchema).Fields() {
                if fid, ok := field.Prop("field-id").(float64); ok {
                        result[field.Name()] = int(fid)
+                       avroTyp := field.Type()
+                       if us, ok := avroTyp.(*avro.UnionSchema); ok {
+                               for _, t := range us.Types() {
+                                       avroTyp = t
+                               }
+                       }
+
+                       if ps, ok := avroTyp.(*avro.PrimitiveSchema); ok && 
ps.Logical() != nil {
+                               logicalTypes[int(fid)] = ps.Logical().Type()
+                       }
                }
        }
-       return result
+       return result, logicalTypes
 }
 
 type hasFieldToIDMap interface {
        setFieldNameToIDMap(map[string]int)
+       setFieldIDToLogicalTypeMap(map[int]avro.LogicalType)
 }
 
 func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool) 
([]ManifestEntry, error) {
@@ -407,7 +419,7 @@ func fetchManifestEntries(m ManifestFile, fs iceio.IO, 
discardDeleted bool) ([]M
                return nil, err
        }
 
-       fieldNameToID := getFieldIDMap(sc)
+       fieldNameToID, fieldIDToLogicalType := getFieldIDMap(sc)
        isVer1, isFallback := true, false
        if string(metadata["format-version"]) == "2" {
                isVer1 = false
@@ -447,6 +459,7 @@ func fetchManifestEntries(m ManifestFile, fs iceio.IO, 
discardDeleted bool) ([]M
                        tmp.inheritSeqNum(m)
                        if fieldToIDMap, ok := 
tmp.DataFile().(hasFieldToIDMap); ok {
                                fieldToIDMap.setFieldNameToIDMap(fieldNameToID)
+                               
fieldToIDMap.setFieldIDToLogicalTypeMap(fieldIDToLogicalType)
                        }
                        results = append(results, tmp)
                }
@@ -627,52 +640,29 @@ func avroColMapToMap[K comparable, V any](c *[]colMap[K, 
V]) map[K]V {
        return out
 }
 
-func avroPartitionData(input map[string]any) map[string]any {
-       // hambra/avro/v2 will unmarshal a map[string]any such that
-       // each entry will actually be a map[string]any with the key being
-       // the avro type, not the field name.
-       //
-       // This means that partition data that looks like this:
-       //
-       //  [{"field-id": 1000, "name": "ts", "type": {"type": "int", 
"logicalType": "date"}}]
-       //
-       // Becomes:
-       //
-       //  map[string]any{"ts": map[string]any{"int.date": time.Time{}}}
-       //
-       // so we need to simplify our map and make the partition data handling 
easier
+func avroPartitionData(input map[string]any, nameToID map[string]int, 
logicalTypes map[int]avro.LogicalType) map[string]any {
        out := make(map[string]any)
        for k, v := range input {
-               switch v := v.(type) {
-               case map[string]any:
-                       for typeName, val := range v {
-                               switch typeName {
-                               case "int.date":
-                                       out[k] = 
Date(val.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 
24).Seconds()))
-                               case "int.time-millis":
-                                       out[k] = 
Time(val.(time.Duration).Microseconds())
-                               case "long.time-micros":
-                                       out[k] = 
Time(val.(time.Duration).Microseconds())
-                               case "long.timestamp-millis":
-                                       out[k] = 
Timestamp(val.(time.Time).UTC().UnixMicro())
-                               case "long.timestamp-micros":
-                                       out[k] = 
Timestamp(val.(time.Time).UTC().UnixMicro())
-                               case "bytes.decimal":
-                                       // not implemented yet
-                               case "fixed.decimal":
-                                       // not implemented yet
+               if id, ok := nameToID[k]; ok {
+                       if logical, ok := logicalTypes[id]; ok {
+                               switch logical {
+                               case avro.Date:
+                                       out[k] = 
Date(v.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour * 
24).Seconds()))
+                               case avro.TimeMillis:
+                                       out[k] = 
Time(v.(time.Duration).Milliseconds())
+                               case avro.TimeMicros:
+                                       out[k] = 
Time(v.(time.Duration).Microseconds())
+                               case avro.TimestampMillis:
+                                       out[k] = 
Timestamp(v.(time.Time).UTC().UnixMilli())
+                               case avro.TimestampMicros:
+                                       out[k] = 
Timestamp(v.(time.Time).UTC().UnixMicro())
                                default:
-                                       out[k] = val
+                                       out[k] = v
                                }
-                       }
-               default:
-                       switch v := v.(type) {
-                       case time.Time:
-                               out[k] = Timestamp(v.UTC().UnixMicro())
-                       default:
-                               out[k] = v
+                               continue
                        }
                }
+               out[k] = v
        }
        return out
 }
@@ -708,7 +698,8 @@ type dataFile struct {
        // not used for anything yet, but important to maintain the information
        // for future development and updates such as when we get to writes,
        // and scan planning
-       fieldNameToID map[string]int
+       fieldNameToID        map[string]int
+       fieldIDToLogicalType map[int]avro.LogicalType
 
        initMaps sync.Once
 }
@@ -722,11 +713,14 @@ func (d *dataFile) initializeMapData() {
                d.distinctCntMap = avroColMapToMap(d.DistinctCounts)
                d.lowerBoundMap = avroColMapToMap(d.LowerBounds)
                d.upperBoundMap = avroColMapToMap(d.UpperBounds)
-               d.PartitionData = avroPartitionData(d.PartitionData)
+               d.PartitionData = avroPartitionData(d.PartitionData, 
d.fieldNameToID, d.fieldIDToLogicalType)
        })
 }
 
 func (d *dataFile) setFieldNameToIDMap(m map[string]int) { d.fieldNameToID = m 
}
+func (d *dataFile) setFieldIDToLogicalTypeMap(m map[int]avro.LogicalType) {
+       d.fieldIDToLogicalType = m
+}
 
 func (d *dataFile) ContentType() ManifestEntryContent { return d.Content }
 func (d *dataFile) FilePath() string                  { return d.Path }
diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go
new file mode 100644
index 0000000..814ae77
--- /dev/null
+++ b/table/arrow_scanner.go
@@ -0,0 +1,601 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "io"
+       "iter"
+       "runtime"
+       "strconv"
+       "sync"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/compute"
+       "github.com/apache/arrow-go/v18/arrow/compute/exprs"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table/internal"
+       "github.com/apache/iceberg-go/table/substrait"
+       "github.com/substrait-io/substrait-go/expr"
+       "golang.org/x/sync/errgroup"
+)
+
+const (
+       ScanOptionArrowUseLargeTypes = "arrow.use_large_types"
+)
+
+type positionDeletes = []*arrow.Chunked
+type perFilePosDeletes = map[string]positionDeletes
+
+func readAllDeleteFiles(ctx context.Context, fs iceio.IO, tasks 
[]FileScanTask) (perFilePosDeletes, error) {
+       var (
+               deletesPerFile = make(perFilePosDeletes)
+               uniqueDeletes  = make(map[string]iceberg.DataFile)
+               err            error
+       )
+
+       for _, t := range tasks {
+               for _, d := range t.DeleteFiles {
+                       if d.ContentType() != iceberg.EntryContentPosDeletes {
+                               continue
+                       }
+
+                       if _, ok := uniqueDeletes[d.FilePath()]; !ok {
+                               uniqueDeletes[d.FilePath()] = d
+                       }
+               }
+       }
+
+       if len(uniqueDeletes) == 0 {
+               return deletesPerFile, nil
+       }
+
+       g, ctx := errgroup.WithContext(ctx)
+       g.SetLimit(runtime.NumCPU())
+
+       perFileChan := make(chan map[string]*arrow.Chunked, runtime.NumCPU())
+       go func() {
+               defer close(perFileChan)
+               for _, v := range uniqueDeletes {
+                       g.Go(func() error {
+                               deletes, err := readDeletes(ctx, fs, v)
+                               if deletes != nil {
+                                       perFileChan <- deletes
+                               }
+                               return err
+                       })
+               }
+
+               err = g.Wait()
+       }()
+
+       for deletes := range perFileChan {
+               for file, arr := range deletes {
+                       deletesPerFile[file] = append(deletesPerFile[file], arr)
+               }
+       }
+
+       return deletesPerFile, err
+}
+
+func readDeletes(ctx context.Context, fs iceio.IO, dataFile iceberg.DataFile) 
(map[string]*arrow.Chunked, error) {
+       src, err := internal.GetFile(ctx, fs, dataFile, true)
+       if err != nil {
+               return nil, err
+       }
+
+       rdr, err := src.GetReader(ctx)
+       if err != nil {
+               return nil, err
+       }
+       defer rdr.Close()
+
+       tbl, err := rdr.ReadTable(ctx)
+       if err != nil {
+               return nil, err
+       }
+       defer tbl.Release()
+
+       tbl, err = array.UnifyTableDicts(compute.GetAllocator(ctx), tbl)
+       if err != nil {
+               return nil, err
+       }
+       defer tbl.Release()
+
+       filePathCol := 
tbl.Column(tbl.Schema().FieldIndices("file_path")[0]).Data()
+       posCol := tbl.Column(tbl.Schema().FieldIndices("pos")[0]).Data()
+       dict := 
filePathCol.Chunk(0).(*array.Dictionary).Dictionary().(*array.String)
+
+       results := make(map[string]*arrow.Chunked)
+       for i := 0; i < dict.Len(); i++ {
+               v := dict.Value(i)
+
+               mask, err := compute.CallFunction(ctx, "equal", nil,
+                       compute.NewDatumWithoutOwning(filePathCol), 
compute.NewDatum(v))
+               if err != nil {
+                       return nil, err
+               }
+               defer mask.Release()
+
+               filtered, err := compute.Filter(ctx, 
compute.NewDatumWithoutOwning(posCol),
+                       mask, *compute.DefaultFilterOptions())
+               if err != nil {
+                       return nil, err
+               }
+
+               results[v] = filtered.(*compute.ChunkedDatum).Value
+       }
+       return results, nil
+}
+
+type set[T comparable] map[T]struct{}
+
+func combinePositionalDeletes(mem memory.Allocator, deletes set[int64], start, 
end int64) arrow.Array {
+       bldr := array.NewInt64Builder(mem)
+       defer bldr.Release()
+
+       for i := start; i < end; i++ {
+               if _, ok := deletes[i]; !ok {
+                       bldr.Append(i)
+               }
+       }
+       return bldr.NewArray()
+}
+
+type recProcessFn func(arrow.Record) (arrow.Record, error)
+
+func processPositionalDeletes(ctx context.Context, deletes set[int64]) 
recProcessFn {
+       nextIdx, mem := int64(0), compute.GetAllocator(ctx)
+       return func(r arrow.Record) (arrow.Record, error) {
+               defer r.Release()
+
+               currentIdx := nextIdx
+               nextIdx += r.NumRows()
+
+               indices := combinePositionalDeletes(mem, deletes, currentIdx, 
nextIdx)
+               defer indices.Release()
+
+               out, err := compute.Take(ctx, *compute.DefaultTakeOptions(),
+                       compute.NewDatumWithoutOwning(r), 
compute.NewDatumWithoutOwning(indices))
+               if err != nil {
+                       return nil, err
+               }
+
+               return out.(*compute.RecordDatum).Value, nil
+       }
+}
+
+func filterRecords(ctx context.Context, recordFilter expr.Expression) 
recProcessFn {
+       return func(rec arrow.Record) (arrow.Record, error) {
+               defer rec.Release()
+
+               input := compute.NewDatumWithoutOwning(rec)
+               mask, err := exprs.ExecuteScalarExpression(ctx, rec.Schema(), 
recordFilter, input)
+               if err != nil {
+                       return nil, err
+               }
+               defer mask.Release()
+
+               result, err := compute.Filter(ctx, input, mask, 
*compute.DefaultFilterOptions())
+               if err != nil {
+                       return nil, err
+               }
+
+               return result.(*compute.RecordDatum).Value, nil
+       }
+}
+
+type arrowScan struct {
+       fs              iceio.IO
+       metadata        Metadata
+       projectedSchema *iceberg.Schema
+       boundRowFilter  iceberg.BooleanExpression
+       caseSensitive   bool
+       rowLimit        int64
+       options         iceberg.Properties
+
+       useLargeTypes bool
+}
+
+func (as *arrowScan) projectedFieldIDs() (set[int], error) {
+       idset := set[int]{}
+       for _, field := range as.projectedSchema.Fields() {
+               switch field.Type.(type) {
+               case *iceberg.MapType, *iceberg.ListType:
+               default:
+                       idset[field.ID] = struct{}{}
+               }
+       }
+
+       if as.boundRowFilter != nil {
+               extracted, err := iceberg.ExtractFieldIDs(as.boundRowFilter)
+               if err != nil {
+                       return nil, err
+               }
+
+               for _, id := range extracted {
+                       idset[id] = struct{}{}
+               }
+       }
+
+       return idset, nil
+}
+
+type enumeratedRecord struct {
+       Record internal.Enumerated[arrow.Record]
+       Task   internal.Enumerated[FileScanTask]
+       Err    error
+}
+
+func (as *arrowScan) prepareToRead(ctx context.Context, file iceberg.DataFile) 
(*iceberg.Schema, []int, internal.FileReader, error) {
+       ids, err := as.projectedFieldIDs()
+       if err != nil {
+               return nil, nil, nil, err
+       }
+
+       src, err := internal.GetFile(ctx, as.fs, file, false)
+       if err != nil {
+               return nil, nil, nil, err
+       }
+
+       rdr, err := src.GetReader(ctx)
+       if err != nil {
+               return nil, nil, nil, err
+       }
+
+       fileSchema, colIndices, err := rdr.PrunedSchema(ids)
+       if err != nil {
+               rdr.Close()
+               return nil, nil, nil, err
+       }
+
+       iceSchema, err := ArrowSchemaToIceberg(fileSchema, false, nil)
+       if err != nil {
+               rdr.Close()
+               return nil, nil, nil, err
+       }
+
+       return iceSchema, colIndices, rdr, nil
+}
+
+func (as *arrowScan) getRecordFilter(ctx context.Context, fileSchema 
*iceberg.Schema) (recProcessFn, bool, error) {
+       if as.boundRowFilter == nil || 
as.boundRowFilter.Equals(iceberg.AlwaysTrue{}) {
+               return nil, false, nil
+       }
+
+       translatedFilter, err := 
iceberg.TranslateColumnNames(as.boundRowFilter, fileSchema)
+       if err != nil {
+               return nil, false, err
+       }
+
+       if translatedFilter.Equals(iceberg.AlwaysFalse{}) {
+               return nil, true, nil
+       }
+
+       translatedFilter, err = iceberg.BindExpr(fileSchema, translatedFilter, 
as.caseSensitive)
+       if err != nil {
+               return nil, false, err
+       }
+
+       if !translatedFilter.Equals(iceberg.AlwaysTrue{}) {
+               extSet, recordFilter, err := substrait.ConvertExpr(fileSchema, 
translatedFilter, as.caseSensitive)
+               if err != nil {
+                       return nil, false, err
+               }
+
+               ctx = exprs.WithExtensionIDSet(ctx, 
exprs.NewExtensionSetDefault(*extSet))
+               return filterRecords(ctx, recordFilter), false, nil
+       }
+
+       return nil, false, nil
+}
+
+func (as *arrowScan) processRecords(
+       ctx context.Context,
+       task internal.Enumerated[FileScanTask],
+       fileSchema *iceberg.Schema,
+       rdr internal.FileReader,
+       columns []int,
+       pipeline []recProcessFn,
+       out chan<- enumeratedRecord) (err error) {
+
+       var (
+               testRowGroups any
+               recRdr        array.RecordReader
+       )
+
+       switch task.Value.File.FileFormat() {
+       case iceberg.ParquetFile:
+               testRowGroups, err = 
newParquetRowGroupStatsEvaluator(fileSchema, as.boundRowFilter, false)
+               if err != nil {
+                       return err
+               }
+       }
+
+       recRdr, err = rdr.GetRecords(ctx, columns, testRowGroups)
+       if err != nil {
+               return err
+       }
+       defer recRdr.Release()
+
+       var (
+               idx  int
+               prev arrow.Record
+       )
+
+       for recRdr.Next() {
+               if prev != nil {
+                       out <- enumeratedRecord{Record: 
internal.Enumerated[arrow.Record]{
+                               Value: prev, Index: idx, Last: false}, Task: 
task}
+                       idx++
+               }
+
+               prev = recRdr.Record()
+               prev.Retain()
+
+               for _, f := range pipeline {
+                       prev, err = f(prev)
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+
+       if prev != nil {
+               out <- enumeratedRecord{Record: 
internal.Enumerated[arrow.Record]{
+                       Value: prev, Index: idx, Last: true}, Task: task}
+       }
+
+       if recRdr.Err() != nil && recRdr.Err() != io.EOF {
+               err = recRdr.Err()
+       }
+
+       return err
+}
+
+func (as *arrowScan) recordsFromTask(ctx context.Context, task 
internal.Enumerated[FileScanTask], out chan<- enumeratedRecord, 
positionalDeletes positionDeletes) (err error) {
+       defer func() {
+               if err != nil {
+                       out <- enumeratedRecord{Task: task, Err: err}
+               }
+       }()
+
+       var (
+               rdr        internal.FileReader
+               iceSchema  *iceberg.Schema
+               colIndices []int
+               filterFunc recProcessFn
+               dropFile   bool
+       )
+
+       iceSchema, colIndices, rdr, err = as.prepareToRead(ctx, task.Value.File)
+       if err != nil {
+               return
+       }
+
+       pipeline := make([]recProcessFn, 0, 2)
+       if len(positionalDeletes) > 0 {
+               deletes := set[int64]{}
+               for _, chunk := range positionalDeletes {
+                       for _, a := range chunk.Chunks() {
+                               for _, v := range 
a.(*array.Int64).Int64Values() {
+                                       deletes[v] = struct{}{}
+                               }
+                       }
+               }
+
+               pipeline = append(pipeline, processPositionalDeletes(ctx, 
deletes))
+       }
+
+       filterFunc, dropFile, err = as.getRecordFilter(ctx, iceSchema)
+       if err != nil {
+               return
+       }
+
+       if dropFile {
+               var emptySchema *arrow.Schema
+               emptySchema, err = SchemaToArrowSchema(as.projectedSchema, nil, 
false, as.useLargeTypes)
+               if err != nil {
+                       return err
+               }
+               out <- enumeratedRecord{Task: task, Record: 
internal.Enumerated[arrow.Record]{
+                       Value: array.NewRecord(emptySchema, nil, 0), Index: 0, 
Last: true}}
+               return
+       }
+
+       if filterFunc != nil {
+               pipeline = append(pipeline, filterFunc)
+       }
+
+       pipeline = append(pipeline, func(r arrow.Record) (arrow.Record, error) {
+               defer r.Release()
+               return ToRequestedSchema(as.projectedSchema, iceSchema, r, 
false, false, as.useLargeTypes)
+       })
+
+       err = as.processRecords(ctx, task, iceSchema, rdr, colIndices, 
pipeline, out)
+       return
+}
+
+func createIterator(ctx context.Context, numWorkers uint, records <-chan 
enumeratedRecord, deletesPerFile perFilePosDeletes, cancel context.CancelFunc, 
rowLimit int64) iter.Seq2[arrow.Record, error] {
+       isBeforeAny := func(batch enumeratedRecord) bool {
+               return batch.Task.Index < 0
+       }
+
+       sequenced := internal.MakeSequencedChan(uint(numWorkers), records,
+               func(left, right *enumeratedRecord) bool {
+                       switch {
+                       case isBeforeAny(*left):
+                               return true
+                       case isBeforeAny(*right):
+                               return false
+                       case left.Err != nil || right.Err != nil:
+                               return true
+                       case left.Task.Index == right.Task.Index:
+                               return left.Record.Index < right.Record.Index
+                       default:
+                               return left.Task.Index < right.Task.Index
+                       }
+               }, func(prev, next *enumeratedRecord) bool {
+                       switch {
+                       case isBeforeAny(*prev):
+                               return next.Task.Index == 0 && 
next.Record.Index == 0
+                       case next.Err != nil:
+                               return true
+                       case prev.Task.Index == next.Task.Index:
+                               return next.Record.Index == prev.Record.Index+1
+                       default:
+                               return next.Task.Index == prev.Task.Index+1 &&
+                                       prev.Record.Last && next.Record.Index 
== 0
+                       }
+               }, enumeratedRecord{Task: 
internal.Enumerated[FileScanTask]{Index: -1}})
+
+       totalRowCount := int64(0)
+       return func(yield func(arrow.Record, error) bool) {
+               defer func() {
+                       for rec := range sequenced {
+                               if rec.Record.Value != nil {
+                                       rec.Record.Value.Release()
+                               }
+                       }
+
+                       for _, v := range deletesPerFile {
+                               for _, chunk := range v {
+                                       chunk.Release()
+                               }
+                       }
+               }()
+
+               defer cancel()
+
+               for {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case enum, ok := <-sequenced:
+                               if !ok {
+                                       return
+                               }
+
+                               if enum.Err != nil {
+                                       yield(nil, enum.Err)
+                                       return
+                               }
+
+                               rec := enum.Record.Value
+                               if rowLimit > 0 {
+                                       if totalRowCount >= rowLimit {
+                                               rec.Release()
+                                               return
+                                       } else if totalRowCount+rec.NumRows() > 
rowLimit {
+                                               defer rec.Release()
+                                               rec = rec.NewSlice(0, 
rowLimit-totalRowCount)
+                                       }
+                               }
+
+                               if rec.NumRows() == 0 {
+                                       // skip empty records
+                                       continue
+                               }
+
+                               if !yield(rec, nil) {
+                                       return
+                               }
+                               totalRowCount += rec.NumRows()
+                               if rowLimit > 0 && totalRowCount >= rowLimit {
+                                       return
+                               }
+                       }
+               }
+       }
+}
+
+func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx context.Context, 
tasks []FileScanTask, deletesPerFile perFilePosDeletes) iter.Seq2[arrow.Record, 
error] {
+       extSet := substrait.NewExtensionSet()
+
+       ctx, cancel := context.WithCancel(exprs.WithExtensionIDSet(ctx, extSet))
+       taskChan := make(chan internal.Enumerated[FileScanTask], len(tasks))
+
+       // numWorkers := 1
+       numWorkers := min(runtime.NumCPU(), len(tasks))
+       records := make(chan enumeratedRecord, numWorkers)
+
+       var wg sync.WaitGroup
+       wg.Add(numWorkers)
+       for i := 0; i < numWorkers; i++ {
+               go func() {
+                       defer wg.Done()
+                       for {
+                               select {
+                               case <-ctx.Done():
+                                       return
+                               case task, ok := <-taskChan:
+                                       if !ok {
+                                               return
+                                       }
+
+                                       if err := as.recordsFromTask(ctx, task, 
records,
+                                               
deletesPerFile[task.Value.File.FilePath()]); err != nil {
+                                               cancel()
+                                               return
+                                       }
+                               }
+                       }
+               }()
+       }
+
+       go func() {
+               for i, t := range tasks {
+                       taskChan <- internal.Enumerated[FileScanTask]{
+                               Value: t, Index: i, Last: i == len(tasks)-1}
+               }
+               close(taskChan)
+
+               wg.Wait()
+               close(records)
+       }()
+
+       return createIterator(ctx, uint(numWorkers), records, deletesPerFile,
+               cancel, as.rowLimit)
+}
+
+func (as *arrowScan) GetRecords(ctx context.Context, tasks []FileScanTask) 
(*arrow.Schema, iter.Seq2[arrow.Record, error], error) {
+       var err error
+       as.useLargeTypes, err = 
strconv.ParseBool(as.options.Get(ScanOptionArrowUseLargeTypes, "false"))
+       if err != nil {
+               as.useLargeTypes = false
+       }
+
+       resultSchema, err := SchemaToArrowSchema(as.projectedSchema, nil, 
false, as.useLargeTypes)
+       if err != nil {
+               return nil, nil, err
+       }
+
+       if as.rowLimit == 0 {
+               return resultSchema, func(yield func(arrow.Record, error) bool) 
{}, nil
+       }
+
+       deletesPerFile, err := readAllDeleteFiles(ctx, as.fs, tasks)
+       if err != nil {
+               return nil, nil, err
+       }
+
+       return resultSchema, as.recordBatchesFromTasksAndDeletes(ctx, tasks, 
deletesPerFile), nil
+}
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index b44d06f..310fbfb 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -465,6 +465,7 @@ func ensureSmallArrowTypes(dt arrow.DataType) 
(arrow.DataType, error) {
 type convertToArrow struct {
        metadata        map[string]string
        includeFieldIDs bool
+       useLargeTypes   bool
 }
 
 func (c convertToArrow) Schema(_ *iceberg.Schema, result arrow.Field) 
arrow.Field {
@@ -554,11 +555,17 @@ func (c convertToArrow) VisitTimestamp() arrow.Field {
 }
 
 func (c convertToArrow) VisitString() arrow.Field {
-       return arrow.Field{Type: arrow.BinaryTypes.LargeString}
+       if c.useLargeTypes {
+               return arrow.Field{Type: arrow.BinaryTypes.LargeString}
+       }
+       return arrow.Field{Type: arrow.BinaryTypes.String}
 }
 
 func (c convertToArrow) VisitBinary() arrow.Field {
-       return arrow.Field{Type: arrow.BinaryTypes.LargeBinary}
+       if c.useLargeTypes {
+               return arrow.Field{Type: arrow.BinaryTypes.LargeBinary}
+       }
+       return arrow.Field{Type: arrow.BinaryTypes.Binary}
 }
 
 func (c convertToArrow) VisitUUID() arrow.Field {
@@ -569,8 +576,9 @@ func (c convertToArrow) VisitUUID() arrow.Field {
 // is non-nil, it will be included as the top-level metadata in the schema. If 
includeFieldIDs
 // is true, then each field of the schema will contain a metadata key 
PARQUET:field_id set to
 // the field id from the iceberg schema.
-func SchemaToArrowSchema(sc *iceberg.Schema, metadata map[string]string, 
includeFieldIDs bool) (*arrow.Schema, error) {
-       top, err := iceberg.Visit(sc, convertToArrow{metadata: metadata, 
includeFieldIDs: includeFieldIDs})
+func SchemaToArrowSchema(sc *iceberg.Schema, metadata map[string]string, 
includeFieldIDs, useLargeTypes bool) (*arrow.Schema, error) {
+       top, err := iceberg.Visit(sc, convertToArrow{metadata: metadata,
+               includeFieldIDs: includeFieldIDs, useLargeTypes: useLargeTypes})
        if err != nil {
                return nil, err
        }
diff --git a/table/arrow_utils_test.go b/table/arrow_utils_test.go
index 9c15985..4dc12f2 100644
--- a/table/arrow_utils_test.go
+++ b/table/arrow_utils_test.go
@@ -76,16 +76,16 @@ func TestArrowToIceberg(t *testing.T) {
                {&arrow.TimestampType{Unit: arrow.Microsecond}, 
iceberg.PrimitiveTypes.Timestamp, true, ""},
                {&arrow.TimestampType{Unit: arrow.Nanosecond}, nil, false, 
"'ns' timestamp precision not supported"},
                {&arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: 
"US/Pacific"}, nil, false, "unsupported arrow type for conversion - 
timestamp[us, tz=US/Pacific]"},
-               {arrow.BinaryTypes.String, iceberg.PrimitiveTypes.String, 
false, ""},
-               {arrow.BinaryTypes.LargeString, iceberg.PrimitiveTypes.String, 
true, ""},
+               {arrow.BinaryTypes.String, iceberg.PrimitiveTypes.String, true, 
""},
+               {arrow.BinaryTypes.LargeString, iceberg.PrimitiveTypes.String, 
false, ""},
                {arrow.BinaryTypes.StringView, nil, false, "unsupported arrow 
type for conversion - string_view"},
-               {arrow.BinaryTypes.Binary, iceberg.PrimitiveTypes.Binary, 
false, ""},
-               {arrow.BinaryTypes.LargeBinary, iceberg.PrimitiveTypes.Binary, 
true, ""},
+               {arrow.BinaryTypes.Binary, iceberg.PrimitiveTypes.Binary, true, 
""},
+               {arrow.BinaryTypes.LargeBinary, iceberg.PrimitiveTypes.Binary, 
false, ""},
                {arrow.BinaryTypes.BinaryView, nil, false, "unsupported arrow 
type for conversion - binary_view"},
                {extensions.NewUUIDType(), iceberg.PrimitiveTypes.UUID, true, 
""},
                {arrow.StructOf(arrow.Field{
                        Name:     "foo",
-                       Type:     arrow.BinaryTypes.LargeString,
+                       Type:     arrow.BinaryTypes.String,
                        Nullable: true,
                        Metadata: arrow.MetadataFrom(map[string]string{
                                table.ArrowParquetFieldIDKey: "1", 
table.ArrowFieldDocKey: "foo doc",
@@ -137,7 +137,7 @@ func TestArrowToIceberg(t *testing.T) {
                }, false, ""},
                {arrow.MapOfWithMetadata(arrow.PrimitiveTypes.Int32,
                        fieldIDMeta("1"),
-                       arrow.BinaryTypes.LargeString, fieldIDMeta("2")),
+                       arrow.BinaryTypes.String, fieldIDMeta("2")),
                        &iceberg.MapType{
                                KeyID: 1, KeyType: iceberg.PrimitiveTypes.Int32,
                                ValueID: 2, ValueType: 
iceberg.PrimitiveTypes.String, ValueRequired: false,
@@ -311,7 +311,7 @@ func TestArrowSchemaRoundTripConversion(t *testing.T) {
        }
 
        for _, tt := range schemas {
-               sc, err := table.SchemaToArrowSchema(tt, nil, true)
+               sc, err := table.SchemaToArrowSchema(tt, nil, true, false)
                require.NoError(t, err)
 
                ice, err := table.ArrowSchemaToIceberg(sc, false, nil)
diff --git a/table/evaluators.go b/table/evaluators.go
index 1458c98..e8b6bda 100644
--- a/table/evaluators.go
+++ b/table/evaluators.go
@@ -22,6 +22,7 @@ import (
        "math"
        "slices"
 
+       "github.com/apache/arrow-go/v18/parquet/metadata"
        "github.com/apache/iceberg-go"
        "github.com/google/uuid"
 )
@@ -682,6 +683,21 @@ func newInclusiveMetricsEvaluator(s *iceberg.Schema, expr 
iceberg.BooleanExpress
        }).Eval, nil
 }
 
+func newParquetRowGroupStatsEvaluator(fileSchema *iceberg.Schema, expr 
iceberg.BooleanExpression,
+       includeEmptyFiles bool) (func(*metadata.RowGroupMetaData, []int) (bool, 
error), error) {
+
+       rewritten, err := iceberg.RewriteNotExpr(expr)
+       if err != nil {
+               return nil, err
+       }
+
+       return (&inclusiveMetricsEval{
+               st:                fileSchema.AsStruct(),
+               includeEmptyFiles: includeEmptyFiles,
+               expr:              rewritten,
+       }).TestRowGroup, nil
+}
+
 type inclusiveMetricsEval struct {
        metricsEvaluator
 
@@ -690,6 +706,46 @@ type inclusiveMetricsEval struct {
        includeEmptyFiles bool
 }
 
+func (m *inclusiveMetricsEval) TestRowGroup(rgmeta *metadata.RowGroupMetaData, 
colIndices []int) (bool, error) {
+       if !m.includeEmptyFiles && rgmeta.NumRows() == 0 {
+               return rowsCannotMatch, nil
+       }
+
+       m.valueCounts = make(map[int]int64)
+       m.nullCounts = make(map[int]int64)
+       m.nanCounts = nil
+       m.lowerBounds = make(map[int][]byte)
+       m.upperBounds = make(map[int][]byte)
+
+       for _, c := range colIndices {
+               colMeta, err := rgmeta.ColumnChunk(c)
+               if err != nil {
+                       return false, err
+               }
+
+               if ok, err := colMeta.StatsSet(); !ok || err != nil {
+                       continue
+               }
+
+               stats, err := colMeta.Statistics()
+               if err != nil {
+                       return false, err
+               }
+
+               fieldID := int(stats.Descr().SchemaNode().FieldID())
+               m.valueCounts[fieldID] = stats.NumValues()
+               if stats.HasNullCount() {
+                       m.nullCounts[fieldID] = stats.NullCount()
+               }
+               if stats.HasMinMax() {
+                       m.lowerBounds[fieldID] = stats.EncodeMin()
+                       m.upperBounds[fieldID] = stats.EncodeMax()
+               }
+       }
+
+       return iceberg.VisitExpr(m.expr, m)
+}
+
 func (m *inclusiveMetricsEval) Eval(file iceberg.DataFile) (bool, error) {
        if !m.includeEmptyFiles && file.Count() == 0 {
                return rowsCannotMatch, nil
diff --git a/table/internal/interfaces.go b/table/internal/interfaces.go
new file mode 100644
index 0000000..9d837fe
--- /dev/null
+++ b/table/internal/interfaces.go
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "context"
+       "fmt"
+       "io"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/compute"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+)
+
+// GetFile opens the given file using the provided file system.
+//
+// The FileSource interface allows abstracting away the underlying file format
+// while providing utilties to read the file as Arrow record batches.
+func GetFile(ctx context.Context, fs iceio.IO, dataFile iceberg.DataFile, 
isPosDeletes bool) (FileSource, error) {
+       switch dataFile.FileFormat() {
+       case iceberg.ParquetFile:
+               return &ParquetFileSource{
+                       mem:  compute.GetAllocator(ctx),
+                       fs:   fs,
+                       file: dataFile,
+               }, nil
+       default:
+               return nil, fmt.Errorf("%w: only parquet format is implemented, 
got %s",
+                       iceberg.ErrNotImplemented, dataFile.FileFormat())
+       }
+}
+
+type FileSource interface {
+       GetReader(context.Context) (FileReader, error)
+}
+
+type FileReader interface {
+       io.Closer
+
+       // PrunedSchema takes in the list of projected field IDs and returns 
the arrow schema
+       // that represents the underlying file schema with only the projected 
fields. It also
+       // returns the indexes of the projected columns to allow reading *only* 
the needed
+       // columns.
+       PrunedSchema(projectedIDs map[int]struct{}) (*arrow.Schema, []int, 
error)
+       // GetRecords returns a record reader for only the provided columns 
(using nil will read
+       // all of the columns of the underlying file.) The `tester` is a 
function that can be used,
+       // if non-nil, to filter aspects of the file such as skipping row 
groups in a parquet file.
+       GetRecords(ctx context.Context, cols []int, tester any) 
(array.RecordReader, error)
+       // ReadTable reads the entire file and returns it as an arrow table.
+       ReadTable(context.Context) (arrow.Table, error)
+}
diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go
new file mode 100644
index 0000000..1c5d38e
--- /dev/null
+++ b/table/internal/parquet_files.go
@@ -0,0 +1,420 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "context"
+       "fmt"
+       "slices"
+       "strconv"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet"
+       "github.com/apache/arrow-go/v18/parquet/file"
+       "github.com/apache/arrow-go/v18/parquet/metadata"
+       "github.com/apache/arrow-go/v18/parquet/pqarrow"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+)
+
+type ParquetFileSource struct {
+       mem  memory.Allocator
+       fs   iceio.IO
+       file iceberg.DataFile
+}
+
+type wrapPqArrowReader struct {
+       *pqarrow.FileReader
+}
+
+func (w wrapPqArrowReader) Close() error {
+       return w.ParquetReader().Close()
+}
+
+func (w wrapPqArrowReader) PrunedSchema(projectedIDs map[int]struct{}) 
(*arrow.Schema, []int, error) {
+       return pruneParquetColumns(w.Manifest, projectedIDs, false)
+}
+
+func (w wrapPqArrowReader) GetRecords(ctx context.Context, cols []int, tester 
any) (array.RecordReader, error) {
+       var (
+               testRg func(*metadata.RowGroupMetaData, []int) (bool, error)
+               ok     bool
+       )
+
+       if tester != nil {
+               testRg, ok = tester.(func(*metadata.RowGroupMetaData, []int) 
(bool, error))
+               if !ok {
+                       return nil, fmt.Errorf("%w: invalid tester function", 
iceberg.ErrInvalidArgument)
+               }
+       }
+
+       var rgList []int
+       if testRg != nil {
+               rgList = make([]int, 0)
+               fileMeta, numRg := w.ParquetReader().MetaData(), 
w.ParquetReader().NumRowGroups()
+               for rg := 0; rg < numRg; rg++ {
+                       rgMeta := fileMeta.RowGroup(rg)
+                       use, err := testRg(rgMeta, cols)
+                       if err != nil {
+                               return nil, err
+                       }
+
+                       if use {
+                               rgList = append(rgList, rg)
+                       }
+               }
+       }
+
+       return w.GetRecordReader(ctx, cols, rgList)
+}
+
+func (pfs *ParquetFileSource) GetReader(ctx context.Context) (FileReader, 
error) {
+       pf, err := pfs.fs.Open(pfs.file.FilePath())
+       if err != nil {
+               return nil, err
+       }
+
+       rdr, err := file.NewParquetReader(pf,
+               file.WithReadProps(parquet.NewReaderProperties(pfs.mem)))
+       if err != nil {
+               return nil, err
+       }
+
+       // TODO: grab these from the context
+       arrProps := pqarrow.ArrowReadProperties{
+               Parallel:  true,
+               BatchSize: 1 << 17,
+       }
+
+       if pfs.file.ContentType() == iceberg.EntryContentPosDeletes {
+               // for dictionary for filepath col
+               arrProps.SetReadDict(0, true)
+       }
+
+       fr, err := pqarrow.NewFileReader(rdr, arrProps, pfs.mem)
+       if err != nil {
+               return nil, err
+       }
+
+       return wrapPqArrowReader{fr}, nil
+}
+
+type manifestVisitor[T any] interface {
+       Manifest(*pqarrow.SchemaManifest, []T) T
+       Field(pqarrow.SchemaField, T) T
+       Struct(pqarrow.SchemaField, []T) T
+       List(pqarrow.SchemaField, T) T
+       Map(pqarrow.SchemaField, T, T) T
+       Primitive(pqarrow.SchemaField) T
+}
+
+func visitParquetManifest[T any](manifest *pqarrow.SchemaManifest, visitor 
manifestVisitor[T]) (res T, err error) {
+       if manifest == nil {
+               err = fmt.Errorf("%w: cannot visit nil manifest", 
iceberg.ErrInvalidArgument)
+               return
+       }
+
+       defer func() {
+               if r := recover(); r != nil {
+                       err = fmt.Errorf("%s", r)
+               }
+       }()
+
+       results := make([]T, len(manifest.Fields))
+       for i, f := range manifest.Fields {
+               res := visitManifestField(f, visitor)
+               results[i] = visitor.Field(f, res)
+       }
+       return visitor.Manifest(manifest, results), nil
+}
+
+func visitParquetManifestStruct[T any](field pqarrow.SchemaField, visitor 
manifestVisitor[T]) T {
+       results := make([]T, len(field.Children))
+
+       for i, f := range field.Children {
+               results[i] = visitManifestField(f, visitor)
+       }
+
+       return visitor.Struct(field, results)
+}
+
+func visitManifestList[T any](field pqarrow.SchemaField, visitor 
manifestVisitor[T]) T {
+       elemField := field.Children[0]
+       res := visitManifestField(elemField, visitor)
+       return visitor.List(field, res)
+}
+
+func visitManifestMap[T any](field pqarrow.SchemaField, visitor 
manifestVisitor[T]) T {
+       kvfield := field.Children[0]
+       keyField, valField := kvfield.Children[0], kvfield.Children[1]
+
+       return visitor.Map(field, visitManifestField(keyField, visitor), 
visitManifestField(valField, visitor))
+}
+
+func visitManifestField[T any](field pqarrow.SchemaField, visitor 
manifestVisitor[T]) T {
+       switch field.Field.Type.(type) {
+       case *arrow.StructType:
+               return visitParquetManifestStruct(field, visitor)
+       case *arrow.MapType:
+               return visitManifestMap(field, visitor)
+       case arrow.ListLikeType:
+               return visitManifestList(field, visitor)
+       default:
+               return visitor.Primitive(field)
+       }
+}
+
+func pruneParquetColumns(manifest *pqarrow.SchemaManifest, selected 
map[int]struct{}, selectFullTypes bool) (*arrow.Schema, []int, error) {
+       visitor := &pruneParquetSchema{
+               selected:  selected,
+               manifest:  manifest,
+               fullTypes: selectFullTypes,
+               indices:   []int{},
+       }
+
+       result, err := visitParquetManifest[arrow.Field](manifest, visitor)
+       if err != nil {
+               return nil, nil, err
+       }
+
+       return arrow.NewSchema(result.Type.(*arrow.StructType).Fields(), 
&result.Metadata),
+               visitor.indices, nil
+}
+
+func getFieldID(f arrow.Field) *int {
+       if !f.HasMetadata() {
+               return nil
+       }
+
+       fieldIDStr, ok := f.Metadata.GetValue("PARQUET:field_id")
+       if !ok {
+               return nil
+       }
+
+       id, err := strconv.Atoi(fieldIDStr)
+       if err != nil {
+               return nil
+       }
+
+       return &id
+}
+
+type pruneParquetSchema struct {
+       selected  map[int]struct{}
+       fullTypes bool
+       manifest  *pqarrow.SchemaManifest
+
+       indices []int
+}
+
+func (p *pruneParquetSchema) fieldID(field arrow.Field) int {
+       if id := getFieldID(field); id != nil {
+               return *id
+       }
+
+       panic(fmt.Errorf("%w: cannot convert %s to Iceberg field, missing 
field_id",
+               iceberg.ErrInvalidSchema, field))
+}
+
+func (p *pruneParquetSchema) Manifest(manifest *pqarrow.SchemaManifest, fields 
[]arrow.Field) arrow.Field {
+       finalFields := slices.DeleteFunc(fields, func(f arrow.Field) bool { 
return f.Type == nil })
+       result := arrow.Field{
+               Type: arrow.StructOf(finalFields...),
+       }
+       if manifest.SchemaMeta != nil {
+               result.Metadata = *manifest.SchemaMeta
+       }
+
+       return result
+}
+
+func (p *pruneParquetSchema) Struct(field pqarrow.SchemaField, children 
[]arrow.Field) arrow.Field {
+       selected, fields := []arrow.Field{}, field.Children
+       sameType := true
+
+       for i, t := range children {
+               field := fields[i]
+               if arrow.TypeEqual(field.Field.Type, t.Type) {
+                       selected = append(selected, *field.Field)
+               } else if t.Type == nil {
+                       sameType = false
+                       // type has changed, create a new field with the 
projected type
+                       selected = append(selected, arrow.Field{
+                               Name:     field.Field.Name,
+                               Type:     field.Field.Type,
+                               Nullable: field.Field.Nullable,
+                               Metadata: field.Field.Metadata,
+                       })
+               }
+       }
+
+       if len(selected) > 0 {
+               if len(selected) == len(fields) && sameType {
+                       // nothing changed, return the original
+                       return *field.Field
+               } else {
+                       result := *field.Field
+                       result.Type = arrow.StructOf(selected...)
+                       return result
+               }
+       }
+
+       return arrow.Field{}
+}
+
+func (p *pruneParquetSchema) Field(field pqarrow.SchemaField, result 
arrow.Field) arrow.Field {
+       _, ok := p.selected[p.fieldID(*field.Field)]
+       if !ok {
+               if result.Type != nil {
+                       return result
+               }
+
+               return arrow.Field{}
+       }
+
+       if p.fullTypes {
+               return *field.Field
+       }
+
+       if _, ok := field.Field.Type.(*arrow.StructType); ok {
+               result := *field.Field
+               result.Type = p.projectSelectedStruct(result.Type)
+               return result
+       }
+
+       if !field.IsLeaf() {
+               panic(fmt.Errorf("cannot explicitly project list or map types"))
+       }
+
+       p.indices = append(p.indices, field.ColIndex)
+       return *field.Field
+}
+
+func (p *pruneParquetSchema) List(field pqarrow.SchemaField, elemResult 
arrow.Field) arrow.Field {
+       _, ok := p.selected[p.fieldID(*field.Children[0].Field)]
+       if !ok {
+               if elemResult.Type != nil {
+                       result := *field.Field
+                       result.Type = 
p.projectList(field.Field.Type.(arrow.ListLikeType), elemResult.Type)
+                       return result
+               }
+
+               return arrow.Field{}
+       }
+
+       if p.fullTypes {
+               return *field.Field
+       }
+
+       _, ok = field.Children[0].Field.Type.(*arrow.StructType)
+       if field.Children[0].Field.Type != nil && ok {
+               result := *field.Field
+               projected := p.projectSelectedStruct(elemResult.Type)
+               result.Type = 
p.projectList(field.Field.Type.(arrow.ListLikeType), projected)
+               return result
+       }
+
+       if !field.Children[0].IsLeaf() {
+               panic(fmt.Errorf("cannot explicitly project list or map types"))
+       }
+
+       p.indices = append(p.indices, field.ColIndex)
+       return *field.Field
+}
+
+func (p *pruneParquetSchema) Map(field pqarrow.SchemaField, keyResult, 
valResult arrow.Field) arrow.Field {
+       _, ok := p.selected[p.fieldID(*field.Children[0].Children[1].Field)]
+       if !ok {
+               if valResult.Type != nil {
+                       result := *field.Field
+                       result.Type = 
p.projectMap(field.Field.Type.(*arrow.MapType), valResult.Type)
+                       return result
+               }
+
+               if _, ok = 
p.selected[p.fieldID(*field.Children[0].Children[1].Field)]; ok {
+                       return *field.Field
+               }
+
+               return arrow.Field{}
+       }
+
+       if p.fullTypes {
+               return *field.Field
+       }
+
+       _, ok = field.Children[0].Children[1].Field.Type.(*arrow.StructType)
+       if ok {
+               result := *field.Field
+               projected := p.projectSelectedStruct(valResult.Type)
+               result.Type = p.projectMap(field.Field.Type.(*arrow.MapType), 
projected)
+               return result
+       }
+
+       if !field.Children[0].Children[1].IsLeaf() {
+               panic("cannot explicitly project list or map types")
+       }
+
+       return *field.Field
+}
+
+func (p *pruneParquetSchema) Primitive(field pqarrow.SchemaField) arrow.Field {
+       return arrow.Field{}
+}
+
+func (p *pruneParquetSchema) projectSelectedStruct(projected arrow.DataType) 
*arrow.StructType {
+       if projected == nil {
+               return &arrow.StructType{}
+       }
+
+       if ty, ok := projected.(*arrow.StructType); ok {
+               return ty
+       }
+
+       panic("expected a struct")
+}
+
+func (p *pruneParquetSchema) projectList(listType arrow.ListLikeType, 
elemResult arrow.DataType) arrow.ListLikeType {
+       if arrow.TypeEqual(listType.Elem(), elemResult) {
+               return listType
+       }
+
+       origField := listType.ElemField()
+       origField.Type = elemResult
+
+       switch listType.(type) {
+       case *arrow.ListType:
+               return arrow.ListOfField(origField)
+       case *arrow.LargeListType:
+               return arrow.LargeListOfField(origField)
+       case *arrow.ListViewType:
+               return arrow.ListViewOfField(origField)
+       }
+
+       n := listType.(*arrow.FixedSizeListType).Len()
+       return arrow.FixedSizeListOfField(n, origField)
+}
+
+func (p *pruneParquetSchema) projectMap(m *arrow.MapType, valResult 
arrow.DataType) *arrow.MapType {
+       if arrow.TypeEqual(m.ItemType(), valResult) {
+               return m
+       }
+
+       return arrow.MapOf(m.KeyType(), valResult)
+}
diff --git a/table/internal/utils.go b/table/internal/utils.go
new file mode 100644
index 0000000..6efc1bc
--- /dev/null
+++ b/table/internal/utils.go
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import "container/heap"
+
+// Enumerated is a quick way to represent a sequenced value that can
+// be processed in parallel and then needs to be reordered.
+type Enumerated[T any] struct {
+       Value T
+       Index int
+       Last  bool
+}
+
+// a simple priority queue
+type pqueue[T any] struct {
+       queue   []*T
+       compare func(a, b *T) bool
+}
+
+func (pq *pqueue[T]) Len() int { return len(pq.queue) }
+func (pq *pqueue[T]) Less(i, j int) bool {
+       return pq.compare(pq.queue[i], pq.queue[j])
+}
+func (pq *pqueue[T]) Swap(i, j int) {
+       pq.queue[i], pq.queue[j] = pq.queue[j], pq.queue[i]
+}
+
+func (pq *pqueue[T]) Push(x any) {
+       pq.queue = append(pq.queue, x.(*T))
+}
+
+func (pq *pqueue[T]) Pop() any {
+       old := pq.queue
+       n := len(old)
+
+       item := old[n-1]
+       old[n-1] = nil
+       pq.queue = old[0 : n-1]
+       return item
+}
+
+// MakeSequencedChan creates a channel that outputs values in a given order
+// based on the comesAfter and isNext functions. The values are read in from
+// the provided source and then re-ordered before being sent to the output.
+func MakeSequencedChan[T any](bufferSize uint, source <-chan T, comesAfter, 
isNext func(a, b *T) bool, initial T) <-chan T {
+       pq := pqueue[T]{queue: make([]*T, 0), compare: comesAfter}
+       heap.Init(&pq)
+       previous, out := &initial, make(chan T, bufferSize)
+       go func() {
+               defer close(out)
+               for val := range source {
+                       heap.Push(&pq, &val)
+                       for pq.Len() > 0 && isNext(previous, pq.queue[0]) {
+                               previous = heap.Pop(&pq).(*T)
+                               out <- *previous
+                       }
+               }
+       }()
+       return out
+}
diff --git a/table/scanner.go b/table/scanner.go
index ea33372..f7aab11 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -21,14 +21,19 @@ import (
        "cmp"
        "context"
        "fmt"
+       "iter"
        "runtime"
        "slices"
        "sync"
 
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/io"
 )
 
+const ScanNoLimit = -1
+
 type keyDefaultMap[K comparable, V any] struct {
        defaultFactory func(K) V
        data           map[K]V
@@ -52,15 +57,15 @@ func (k *keyDefaultMap[K, V]) Get(key K) V {
        return v
 }
 
-func newKeyDefaultMap[K comparable, V any](factory func(K) V) keyDefaultMap[K, 
V] {
-       return keyDefaultMap[K, V]{
+func newKeyDefaultMap[K comparable, V any](factory func(K) V) 
*keyDefaultMap[K, V] {
+       return &keyDefaultMap[K, V]{
                data:           make(map[K]V),
                defaultFactory: factory,
        }
 }
 
-func newKeyDefaultMapWrapErr[K comparable, V any](factory func(K) (V, error)) 
keyDefaultMap[K, V] {
-       return keyDefaultMap[K, V]{
+func newKeyDefaultMapWrapErr[K comparable, V any](factory func(K) (V, error)) 
*keyDefaultMap[K, V] {
+       return &keyDefaultMap[K, V]{
                data: make(map[K]V),
                defaultFactory: func(k K) V {
                        v, err := factory(k)
@@ -124,51 +129,51 @@ type Scan struct {
        caseSensitive  bool
        snapshotID     *int64
        options        iceberg.Properties
+       limit          int64
+
+       partitionFilters *keyDefaultMap[int, iceberg.BooleanExpression]
+}
 
-       partitionFilters keyDefaultMap[int, iceberg.BooleanExpression]
+func (scan *Scan) UseRowLimit(n int64) *Scan {
+       out := *scan
+       out.limit = n
+       return &out
 }
 
-func (s *Scan) UseRef(name string) (*Scan, error) {
-       if s.snapshotID != nil {
+func (scan *Scan) UseRef(name string) (*Scan, error) {
+       if scan.snapshotID != nil {
                return nil, fmt.Errorf("%w: cannot override ref, already set 
snapshot id %d",
-                       iceberg.ErrInvalidArgument, *s.snapshotID)
+                       iceberg.ErrInvalidArgument, *scan.snapshotID)
        }
 
-       if snap := s.metadata.SnapshotByName(name); snap != nil {
-               out := &Scan{
-                       metadata:       s.metadata,
-                       io:             s.io,
-                       rowFilter:      s.rowFilter,
-                       selectedFields: s.selectedFields,
-                       caseSensitive:  s.caseSensitive,
-                       snapshotID:     &snap.SnapshotID,
-                       options:        s.options,
-               }
+       if snap := scan.metadata.SnapshotByName(name); snap != nil {
+               out := *scan
+               out.snapshotID = &snap.SnapshotID
                out.partitionFilters = 
newKeyDefaultMapWrapErr(out.buildPartitionProjection)
 
-               return out, nil
+               return &out, nil
        }
 
        return nil, fmt.Errorf("%w: cannot scan unknown ref=%s", 
iceberg.ErrInvalidArgument, name)
 }
 
-func (s *Scan) Snapshot() *Snapshot {
-       if s.snapshotID != nil {
-               return s.metadata.SnapshotByID(*s.snapshotID)
+func (scan *Scan) Snapshot() *Snapshot {
+       if scan.snapshotID != nil {
+               return scan.metadata.SnapshotByID(*scan.snapshotID)
        }
-       return s.metadata.CurrentSnapshot()
+       return scan.metadata.CurrentSnapshot()
 }
 
-func (s *Scan) Projection() (*iceberg.Schema, error) {
-       curSchema := s.metadata.CurrentSchema()
-       if s.snapshotID != nil {
-               snap := s.metadata.SnapshotByID(*s.snapshotID)
+func (scan *Scan) Projection() (*iceberg.Schema, error) {
+       curSchema := scan.metadata.CurrentSchema()
+       if scan.snapshotID != nil {
+               snap := scan.metadata.SnapshotByID(*scan.snapshotID)
                if snap == nil {
-                       return nil, fmt.Errorf("%w: snapshot not found: %d", 
ErrInvalidOperation, *s.snapshotID)
+                       return nil, fmt.Errorf("%w: snapshot not found: %d", 
ErrInvalidOperation, *scan.snapshotID)
                }
 
                if snap.SchemaID != nil {
-                       for _, schema := range s.metadata.Schemas() {
+                       for _, schema := range scan.metadata.Schemas() {
                                if schema.ID == *snap.SchemaID {
                                        curSchema = schema
                                        break
@@ -177,33 +182,33 @@ func (s *Scan) Projection() (*iceberg.Schema, error) {
                }
        }
 
-       if slices.Contains(s.selectedFields, "*") {
+       if slices.Contains(scan.selectedFields, "*") {
                return curSchema, nil
        }
 
-       return curSchema.Select(s.caseSensitive, s.selectedFields...)
+       return curSchema.Select(scan.caseSensitive, scan.selectedFields...)
 }
 
-func (s *Scan) buildPartitionProjection(specID int) 
(iceberg.BooleanExpression, error) {
-       project := newInclusiveProjection(s.metadata.CurrentSchema(),
-               s.metadata.PartitionSpecs()[specID], true)
-       return project(s.rowFilter)
+func (scan *Scan) buildPartitionProjection(specID int) 
(iceberg.BooleanExpression, error) {
+       project := newInclusiveProjection(scan.metadata.CurrentSchema(),
+               scan.metadata.PartitionSpecs()[specID], true)
+       return project(scan.rowFilter)
 }
 
-func (s *Scan) buildManifestEvaluator(specID int) (func(iceberg.ManifestFile) 
(bool, error), error) {
-       spec := s.metadata.PartitionSpecs()[specID]
-       return newManifestEvaluator(spec, s.metadata.CurrentSchema(),
-               s.partitionFilters.Get(specID), s.caseSensitive)
+func (scan *Scan) buildManifestEvaluator(specID int) 
(func(iceberg.ManifestFile) (bool, error), error) {
+       spec := scan.metadata.PartitionSpecs()[specID]
+       return newManifestEvaluator(spec, scan.metadata.CurrentSchema(),
+               scan.partitionFilters.Get(specID), scan.caseSensitive)
 }
 
-func (s *Scan) buildPartitionEvaluator(specID int) func(iceberg.DataFile) 
(bool, error) {
-       spec := s.metadata.PartitionSpecs()[specID]
-       partType := spec.PartitionType(s.metadata.CurrentSchema())
+func (scan *Scan) buildPartitionEvaluator(specID int) func(iceberg.DataFile) 
(bool, error) {
+       spec := scan.metadata.PartitionSpecs()[specID]
+       partType := spec.PartitionType(scan.metadata.CurrentSchema())
        partSchema := iceberg.NewSchema(0, partType.FieldList...)
-       partExpr := s.partitionFilters.Get(specID)
+       partExpr := scan.partitionFilters.Get(specID)
 
        return func(d iceberg.DataFile) (bool, error) {
-               fn, err := iceberg.ExpressionEvaluator(partSchema, partExpr, 
s.caseSensitive)
+               fn, err := iceberg.ExpressionEvaluator(partSchema, partExpr, 
scan.caseSensitive)
                if err != nil {
                        return false, err
                }
@@ -212,7 +217,7 @@ func (s *Scan) buildPartitionEvaluator(specID int) 
func(iceberg.DataFile) (bool,
        }
 }
 
-func (s *Scan) checkSequenceNumber(minSeqNum int64, manifest 
iceberg.ManifestFile) bool {
+func (scan *Scan) checkSequenceNumber(minSeqNum int64, manifest 
iceberg.ManifestFile) bool {
        return manifest.ManifestContent() == iceberg.ManifestContentData ||
                (manifest.ManifestContent() == iceberg.ManifestContentDeletes &&
                        manifest.SequenceNum() >= minSeqNum)
@@ -254,8 +259,8 @@ func matchDeletesToData(entry iceberg.ManifestEntry, 
positionalDeletes []iceberg
        return out, nil
 }
 
-func (s *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
-       snap := s.Snapshot()
+func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
+       snap := scan.Snapshot()
        if snap == nil {
                return nil, nil
        }
@@ -263,8 +268,8 @@ func (s *Scan) PlanFiles(ctx context.Context) 
([]FileScanTask, error) {
        // step 1: filter manifests using partition summaries
        // the filter depends on the partition spec used to write the manifest 
file
        // so create a cache of filters for each spec id
-       manifestEvaluators := newKeyDefaultMapWrapErr(s.buildManifestEvaluator)
-       manifestList, err := snap.Manifests(s.io)
+       manifestEvaluators := 
newKeyDefaultMapWrapErr(scan.buildManifestEvaluator)
+       manifestList, err := snap.Manifests(scan.io)
        if err != nil {
                return nil, err
        }
@@ -278,9 +283,9 @@ func (s *Scan) PlanFiles(ctx context.Context) 
([]FileScanTask, error) {
 
        // step 2: filter the data files in each manifest
        // this filter depends on the partition spec used to write the manifest 
file
-       partitionEvaluators := newKeyDefaultMap(s.buildPartitionEvaluator)
+       partitionEvaluators := newKeyDefaultMap(scan.buildPartitionEvaluator)
        metricsEval, err := newInclusiveMetricsEvaluator(
-               s.metadata.CurrentSchema(), s.rowFilter, s.caseSensitive, 
s.options["include_empty_files"] == "true")
+               scan.metadata.CurrentSchema(), scan.rowFilter, 
scan.caseSensitive, scan.options["include_empty_files"] == "true")
        if err != nil {
                return nil, err
        }
@@ -289,7 +294,7 @@ func (s *Scan) PlanFiles(ctx context.Context) 
([]FileScanTask, error) {
        dataEntries := make([]iceberg.ManifestEntry, 0)
        positionalDeleteEntries := make([]iceberg.ManifestEntry, 0)
 
-       nworkers := runtime.NumCPU()
+       nworkers := min(runtime.NumCPU(), len(manifestList))
        var wg sync.WaitGroup
 
        manifestChan := make(chan iceberg.ManifestFile, len(manifestList))
@@ -309,11 +314,11 @@ func (s *Scan) PlanFiles(ctx context.Context) 
([]FileScanTask, error) {
                                                return
                                        }
 
-                                       if !s.checkSequenceNumber(minSeqNum, m) 
{
+                                       if !scan.checkSequenceNumber(minSeqNum, 
m) {
                                                continue
                                        }
 
-                                       entries, err := openManifest(s.io, m,
+                                       entries, err := openManifest(scan.io, m,
                                                
partitionEvaluators.Get(int(m.PartitionSpecID())), metricsEval)
                                        if err != nil {
                                                cancel(err)
@@ -393,3 +398,62 @@ type FileScanTask struct {
        DeleteFiles   []iceberg.DataFile
        Start, Length int64
 }
+
+// ToArrowRecords returns the arrow schema of the expected records and an 
interator
+// that can be used with a range expression to read the records as they are 
available.
+// If an error is encountered, during the planning and setup then this will 
return the
+// error directly. If the error occurs while iterating the records, it will be 
returned
+// by the iterator.
+//
+// The purpose for returning the schema up front is to handle the case where 
there are no
+// rows returned. The resulting Arrow Schema of the projection will still be 
known.
+func (scan *Scan) ToArrowRecords(ctx context.Context) (*arrow.Schema, 
iter.Seq2[arrow.Record, error], error) {
+       tasks, err := scan.PlanFiles(ctx)
+       if err != nil {
+               return nil, nil, err
+       }
+
+       var boundFilter iceberg.BooleanExpression
+       if scan.rowFilter != nil {
+               boundFilter, err = 
iceberg.BindExpr(scan.metadata.CurrentSchema(), scan.rowFilter, 
scan.caseSensitive)
+               if err != nil {
+                       return nil, nil, err
+               }
+       }
+
+       schema, err := scan.Projection()
+       if err != nil {
+               return nil, nil, err
+       }
+
+       return (&arrowScan{
+               metadata:        scan.metadata,
+               fs:              scan.io,
+               projectedSchema: schema,
+               boundRowFilter:  boundFilter,
+               caseSensitive:   scan.caseSensitive,
+               rowLimit:        scan.limit,
+               options:         scan.options,
+       }).GetRecords(ctx, tasks)
+}
+
+// ToArrowTable calls ToArrowRecords and then gathers all of the records 
together
+// and returns an arrow.Table make from those records.
+func (scan *Scan) ToArrowTable(ctx context.Context) (arrow.Table, error) {
+       schema, itr, err := scan.ToArrowRecords(ctx)
+       if err != nil {
+               return nil, err
+       }
+
+       records := make([]arrow.Record, 0)
+       for rec, err := range itr {
+               if err != nil {
+                       return nil, err
+               }
+
+               defer rec.Release()
+               records = append(records, rec)
+       }
+
+       return array.NewTableFromRecords(schema, records), nil
+}
diff --git a/table/scanner_test.go b/table/scanner_test.go
index af4b8f6..dcb7ed1 100644
--- a/table/scanner_test.go
+++ b/table/scanner_test.go
@@ -21,23 +21,46 @@ package table_test
 
 import (
        "context"
+       "iter"
+       "math"
+       "slices"
+       "strconv"
+       "strings"
        "testing"
 
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/compute"
+       "github.com/apache/arrow-go/v18/arrow/extensions"
+       "github.com/apache/arrow-go/v18/arrow/memory"
        "github.com/apache/iceberg-go"
        "github.com/apache/iceberg-go/catalog"
        "github.com/apache/iceberg-go/io"
-       "github.com/stretchr/testify/assert"
-       "github.com/stretchr/testify/require"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/suite"
 )
 
-func TestScanner(t *testing.T) {
+type ScannerSuite struct {
+       suite.Suite
+
+       ctx   context.Context
+       cat   catalog.Catalog
+       props iceberg.Properties
+}
+
+func (s *ScannerSuite) SetupTest() {
+       s.ctx = context.Background()
+
        cat, err := catalog.NewRestCatalog("rest", "http://localhost:8181";)
-       require.NoError(t, err)
+       s.Require().NoError(err)
 
-       props := iceberg.Properties{
+       s.cat = cat
+       s.props = iceberg.Properties{
                io.S3Region:      "us-east-1",
                io.S3AccessKeyID: "admin", io.S3SecretAccessKey: "password"}
+}
 
+func (s *ScannerSuite) TestScanner() {
        tests := []struct {
                table            string
                expr             iceberg.BooleanExpression
@@ -54,68 +77,549 @@ func TestScanner(t *testing.T) {
                {"test_partitioned_by_years", 
iceberg.LessThan(iceberg.Reference("dt"), "2023-03-05"), 1},
                {"test_partitioned_by_years", 
iceberg.GreaterThanEqual(iceberg.Reference("dt"), "2023-03-05"), 1},
                {"test_partitioned_by_months", 
iceberg.GreaterThanEqual(iceberg.Reference("dt"), "2023-03-05"), 1},
-               {"test_partitioned_by_days", 
iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00"), 
4},
+               {"test_partitioned_by_days", 
iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00"), 
8},
                {"test_partitioned_by_hours", 
iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00"), 
8},
                {"test_partitioned_by_truncate", 
iceberg.GreaterThanEqual(iceberg.Reference("letter"), "e"), 8},
                {"test_partitioned_by_bucket", 
iceberg.GreaterThanEqual(iceberg.Reference("number"), int32(5)), 6},
-               {"test_uuid_and_fixed_unpartitioned", iceberg.AlwaysTrue{}, 4},
                {"test_uuid_and_fixed_unpartitioned", 
iceberg.EqualTo(iceberg.Reference("uuid_col"), 
"102cb62f-e6f8-4eb0-9973-d9b012ff0967"), 1},
        }
 
        for _, tt := range tests {
-               t.Run(tt.table+" "+tt.expr.String(), func(t *testing.T) {
+               s.Run(tt.table+" "+tt.expr.String(), func() {
                        ident := catalog.ToRestIdentifier("default", tt.table)
 
-                       tbl, err := cat.LoadTable(context.Background(), ident, 
props)
-                       require.NoError(t, err)
+                       tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+                       s.Require().NoError(err)
 
-                       scan := tbl.Scan(tt.expr, 0, true, "*")
-                       tasks, err := scan.PlanFiles(context.Background())
-                       require.NoError(t, err)
+                       scan := tbl.Scan(table.WithRowFilter(tt.expr))
+                       tasks, err := scan.PlanFiles(s.ctx)
+                       s.Require().NoError(err)
 
-                       assert.Len(t, tasks, tt.expectedNumTasks)
+                       s.Len(tasks, tt.expectedNumTasks)
                })
        }
 }
 
-func TestScannerWithDeletes(t *testing.T) {
-       cat, err := catalog.NewRestCatalog("rest", "http://localhost:8181";)
-       require.NoError(t, err)
-
-       props := iceberg.Properties{
-               io.S3Region:      "us-east-1",
-               io.S3AccessKeyID: "admin", io.S3SecretAccessKey: "password"}
-
+func (s *ScannerSuite) TestScannerWithDeletes() {
        ident := catalog.ToRestIdentifier("default", 
"test_positional_mor_deletes")
 
-       tbl, err := cat.LoadTable(context.Background(), ident, props)
-       require.NoError(t, err)
+       tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+       s.Require().NoError(err)
 
-       scan := tbl.Scan(iceberg.AlwaysTrue{}, 0, true, "*")
-       tasks, err := scan.PlanFiles(context.Background())
-       require.NoError(t, err)
+       scan := tbl.Scan()
+       tasks, err := scan.PlanFiles(s.ctx)
+       s.Require().NoError(err)
 
-       assert.Len(t, tasks, 1)
-       assert.Len(t, tasks[0].DeleteFiles, 1)
+       s.Len(tasks, 1)
+       s.Len(tasks[0].DeleteFiles, 1)
 
        tagScan, err := scan.UseRef("tag_12")
-       require.NoError(t, err)
+       s.Require().NoError(err)
 
-       tasks, err = tagScan.PlanFiles(context.Background())
-       require.NoError(t, err)
+       tasks, err = tagScan.PlanFiles(s.ctx)
+       s.Require().NoError(err)
 
-       assert.Len(t, tasks, 1)
-       assert.Len(t, tasks[0].DeleteFiles, 0)
+       s.Len(tasks, 1)
+       s.Len(tasks[0].DeleteFiles, 0)
 
        _, err = tagScan.UseRef("without_5")
-       assert.ErrorIs(t, err, iceberg.ErrInvalidArgument)
+       s.ErrorIs(err, iceberg.ErrInvalidArgument)
 
        tagScan, err = scan.UseRef("without_5")
-       require.NoError(t, err)
+       s.Require().NoError(err)
+
+       tasks, err = tagScan.PlanFiles(s.ctx)
+       s.Require().NoError(err)
+
+       s.Len(tasks, 1)
+       s.Len(tasks[0].DeleteFiles, 1)
+}
+
+func (s *ScannerSuite) TestArrowNan() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(s.T(), 0)
+
+       for _, name := range []string{"test_null_nan", 
"test_null_nan_rewritten"} {
+               s.Run(name, func() {
+                       ident := catalog.ToRestIdentifier("default", name)
+                       tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+                       s.Require().NoError(err)
+
+                       ctx := compute.WithAllocator(s.ctx, mem)
+                       results, err := 
tbl.Scan(table.WithRowFilter(iceberg.IsNaN(iceberg.Reference("col_numeric"))),
+                               table.WithSelectedFields("idx", 
"col_numeric")).ToArrowTable(ctx)
+                       s.Require().NoError(err)
+                       defer results.Release()
+
+                       s.EqualValues(2, results.NumCols())
+                       s.EqualValues(1, results.NumRows())
+
+                       s.Equal(int32(1), 
results.Column(0).Data().Chunk(0).(*array.Int32).Value(0))
+                       
s.True(math.IsNaN(float64(results.Column(1).Data().Chunk(0).(*array.Float32).Value(0))))
+               })
+       }
+}
+
+func (s *ScannerSuite) TestArrowNotNanCount() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(s.T(), 0)
+
+       ident := catalog.ToRestIdentifier("default", "test_null_nan")
+       tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+       s.Require().NoError(err)
+
+       ctx := compute.WithAllocator(s.ctx, mem)
+       results, err := 
tbl.Scan(table.WithRowFilter(iceberg.NotNaN(iceberg.Reference("col_numeric"))),
+               table.WithSelectedFields("idx")).ToArrowTable(ctx)
+       s.Require().NoError(err)
+       defer results.Release()
+
+       s.EqualValues(1, results.NumCols())
+       s.EqualValues(2, results.NumRows())
+}
+
+func (s *ScannerSuite) TestScanWithLimit() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(s.T(), 0)
+
+       ident := catalog.ToRestIdentifier("default", "test_limit")
+       tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+       s.Require().NoError(err)
+
+       tests := []struct {
+               limit        int64
+               expectedRows int64
+       }{
+               {1, 1},
+               {0, 0},
+               {999, 10},
+       }
+
+       for _, tt := range tests {
+               s.Run(strconv.Itoa(int(tt.limit)), func() {
+                       scopedMem := memory.NewCheckedAllocatorScope(mem)
+                       defer scopedMem.CheckSize(s.T())
+
+                       ctx := compute.WithAllocator(s.ctx, mem)
+                       result, err := tbl.Scan(table.WithSelectedFields("idx"),
+                               table.WithLimit(tt.limit)).ToArrowTable(ctx)
+                       s.Require().NoError(err)
+                       defer result.Release()
+
+                       s.EqualValues(tt.expectedRows, result.NumRows())
+               })
+       }
+}
+
+func (s *ScannerSuite) TestScannerRecordsDeletes() {
+       // number, letter
+       //  (1, 'a'),
+       //  (2, 'b'),
+       //  (3, 'c'),
+       //  (4, 'd'),
+       //  (5, 'e'),
+       //  (6, 'f'),
+       //  (7, 'g'),
+       //  (8, 'h'),
+       //  (9, 'i'), <- deleted
+       //  (10, 'j'),
+       //  (11, 'k'),
+       //  (12, 'l')
+       ident := catalog.ToRestIdentifier("default", 
"test_positional_mor_deletes")
+
+       tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+       s.Require().NoError(err)
+
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(s.T(), 0)
+
+       expectedSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "number", Type: arrow.PrimitiveTypes.Int32, Nullable: 
true},
+       }, nil)
+
+       ref := iceberg.Reference("letter")
+
+       tests := []struct {
+               name     string
+               filter   iceberg.BooleanExpression
+               rowLimit int64
+               expected string
+       }{
+               {"all", iceberg.AlwaysTrue{}, table.ScanNoLimit,
+                       `[1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]`},
+               {"filter", iceberg.NewAnd(iceberg.GreaterThanEqual(ref, "e"),
+                       iceberg.LessThan(ref, "k")), table.ScanNoLimit, `[5, 6, 
7, 8, 10]`},
+               {"filter and limit", 
iceberg.NewAnd(iceberg.GreaterThanEqual(ref, "e"),
+                       iceberg.LessThan(ref, "k")), 1, `[5]`},
+               {"limit", nil, 3, `[1, 2, 3]`},
+       }
+
+       for _, tt := range tests {
+               s.Run(tt.name, func() {
+                       scopedMem := memory.NewCheckedAllocatorScope(mem)
+                       defer scopedMem.CheckSize(s.T())
+
+                       ctx := compute.WithAllocator(s.ctx, mem)
+
+                       scan := tbl.Scan(table.WithRowFilter(tt.filter),
+                               table.WithSelectedFields("number"))
+                       tasks, err := scan.PlanFiles(ctx)
+                       s.Require().NoError(err)
+
+                       s.Len(tasks, 1)
+                       s.Len(tasks[0].DeleteFiles, 1)
+
+                       _, itr, err := 
scan.UseRowLimit(tt.rowLimit).ToArrowRecords(ctx)
+                       s.Require().NoError(err)
+
+                       next, stop := iter.Pull2(itr)
+                       defer stop()
+
+                       rec, err, valid := next()
+                       s.Require().True(valid)
+                       s.Require().NoError(err)
+                       defer rec.Release()
+
+                       s.True(expectedSchema.Equal(rec.Schema()), "expected: 
%s\ngot: %s\n",
+                               expectedSchema, rec.Schema())
+
+                       arr, _, err := array.FromJSON(mem, 
arrow.PrimitiveTypes.Int32,
+                               strings.NewReader(tt.expected))
+                       s.Require().NoError(err)
+                       defer arr.Release()
+
+                       expectedResult := array.NewRecord(expectedSchema, 
[]arrow.Array{arr}, int64(arr.Len()))
+                       defer expectedResult.Release()
+
+                       s.True(array.RecordEqual(expectedResult, rec), 
"expected: %s\ngot: %s\n", expectedResult, rec)
+
+                       _, err, valid = next()
+                       s.Require().NoError(err)
+                       s.Require().False(valid)
+               })
+       }
+}
+
+func (s *ScannerSuite) TestScannerRecordsDoubleDeletes() {
+       // number, letter
+       //  (1, 'a'),
+       //  (2, 'b'),
+       //  (3, 'c'),
+       //  (4, 'd'),
+       //  (5, 'e'),
+       //  (6, 'f'), <- second delete
+       //  (7, 'g'),
+       //  (8, 'h'),
+       //  (9, 'i'), <- first delete
+       //  (10, 'j'),
+       //  (11, 'k'),
+       //  (12, 'l')
+       ident := catalog.ToRestIdentifier("default", 
"test_positional_mor_double_deletes")
+
+       tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+       s.Require().NoError(err)
+
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(s.T(), 0)
+
+       expectedSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "number", Type: arrow.PrimitiveTypes.Int32, Nullable: 
true},
+       }, nil)
+
+       ref := iceberg.Reference("letter")
+
+       tests := []struct {
+               name     string
+               filter   iceberg.BooleanExpression
+               rowLimit int64
+               expected string
+       }{
+               {"all", iceberg.AlwaysTrue{}, table.ScanNoLimit,
+                       `[1, 2, 3, 4, 5, 7, 8, 10, 11, 12]`},
+               {"filter", iceberg.NewAnd(iceberg.GreaterThanEqual(ref, "e"),
+                       iceberg.LessThan(ref, "k")), table.ScanNoLimit, `[5, 7, 
8, 10]`},
+               {"filter and limit", 
iceberg.NewAnd(iceberg.GreaterThanEqual(ref, "e"),
+                       iceberg.LessThan(ref, "k")), 1, `[5]`},
+               {"limit", nil, 8, `[1, 2, 3, 4, 5, 7, 8, 10]`},
+       }
+
+       for _, tt := range tests {
+               s.Run(tt.name, func() {
+                       scopedMem := memory.NewCheckedAllocatorScope(mem)
+                       defer scopedMem.CheckSize(s.T())
+
+                       ctx := compute.WithAllocator(s.ctx, mem)
+
+                       scan := tbl.Scan(table.WithRowFilter(tt.filter),
+                               table.WithSelectedFields("number"))
+                       tasks, err := scan.PlanFiles(ctx)
+                       s.Require().NoError(err)
+
+                       s.Len(tasks, 1)
+                       s.Len(tasks[0].DeleteFiles, 2)
 
-       tasks, err = tagScan.PlanFiles(context.Background())
-       require.NoError(t, err)
+                       _, itr, err := 
scan.UseRowLimit(tt.rowLimit).ToArrowRecords(ctx)
+                       s.Require().NoError(err)
 
-       assert.Len(t, tasks, 1)
-       assert.Len(t, tasks[0].DeleteFiles, 1)
+                       next, stop := iter.Pull2(itr)
+                       defer stop()
+
+                       rec, err, valid := next()
+                       s.Require().True(valid)
+                       s.Require().NoError(err)
+                       defer rec.Release()
+
+                       s.True(expectedSchema.Equal(rec.Schema()), "expected: 
%s\ngot: %s\n",
+                               expectedSchema, rec.Schema())
+
+                       arr, _, err := array.FromJSON(mem, 
arrow.PrimitiveTypes.Int32,
+                               strings.NewReader(tt.expected))
+                       s.Require().NoError(err)
+                       defer arr.Release()
+
+                       expectedResult := array.NewRecord(expectedSchema, 
[]arrow.Array{arr}, int64(arr.Len()))
+                       defer expectedResult.Release()
+
+                       s.True(array.RecordEqual(expectedResult, rec), 
"expected: %s\ngot: %s\n", expectedResult, rec)
+
+                       _, err, valid = next()
+                       s.Require().NoError(err)
+                       s.Require().False(valid)
+               })
+       }
+}
+
+func getSortedValues(col *arrow.Column) []int32 {
+       result := make([]int32, 0, col.Len())
+       for _, c := range col.Data().Chunks() {
+               arr := c.(*array.Int32)
+               result = append(result, arr.Int32Values()...)
+       }
+       slices.Sort(result)
+       return result
+}
+
+func getStrValues(col *arrow.Column) []string {
+       result := make([]string, 0, col.Len())
+       for _, c := range col.Data().Chunks() {
+               for i := 0; i < c.Len(); i++ {
+                       result = append(result, c.ValueStr(i))
+               }
+       }
+       return result
+}
+
+func (s *ScannerSuite) TestPartitionedTables() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(s.T(), 0)
+
+       expectedSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "number", Type: arrow.PrimitiveTypes.Int32, Nullable: 
true},
+       }, nil)
+
+       tests := []struct {
+               table     string
+               predicate iceberg.BooleanExpression
+       }{
+               {"test_partitioned_by_identity",
+                       iceberg.GreaterThanEqual(iceberg.Reference("ts"), 
"2023-03-05T00:00:00+00:00")},
+               {"test_partitioned_by_years", 
iceberg.GreaterThanEqual(iceberg.Reference("dt"), "2023-03-05")},
+               {"test_partitioned_by_months", 
iceberg.GreaterThanEqual(iceberg.Reference("dt"), "2023-03-05")},
+               {"test_partitioned_by_days", 
iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00")},
+               {"test_partitioned_by_hours", 
iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00")},
+               {"test_partitioned_by_truncate", 
iceberg.GreaterThanEqual(iceberg.Reference("letter"), "e")},
+               {"test_partitioned_by_bucket", 
iceberg.GreaterThanEqual(iceberg.Reference("number"), int32(5))},
+       }
+
+       for _, tt := range tests {
+               s.Run(tt.table+" "+tt.predicate.String(), func() {
+                       scopedMem := memory.NewCheckedAllocatorScope(mem)
+                       defer scopedMem.CheckSize(s.T())
+                       ctx := compute.WithAllocator(s.ctx, mem)
+
+                       ident := catalog.ToRestIdentifier("default", tt.table)
+
+                       tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+                       s.Require().NoError(err)
+
+                       scan := tbl.Scan(table.WithRowFilter(tt.predicate),
+                               table.WithSelectedFields("number"))
+                       resultTable, err := scan.ToArrowTable(ctx)
+                       s.Require().NoError(err)
+                       defer resultTable.Release()
+
+                       s.True(expectedSchema.Equal(resultTable.Schema()), 
"expected: %s\ngot: %s\n",
+                               expectedSchema, resultTable.Schema())
+
+                       s.Equal([]int32{5, 6, 7, 8, 9, 10, 11, 12},
+                               getSortedValues(resultTable.Column(0)))
+               })
+       }
+}
+
+func (s *ScannerSuite) TestUnpartitionedUUIDTable() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(s.T(), 0)
+
+       expectedSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "uuid_col", Type: extensions.NewUUIDType(), Nullable: 
true},
+       }, nil)
+
+       ident := catalog.ToRestIdentifier("default", 
"test_uuid_and_fixed_unpartitioned")
+
+       tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+       s.Require().NoError(err)
+
+       ctx := compute.WithAllocator(s.ctx, mem)
+       results, err := tbl.Scan(table.WithRowFilter(
+               iceberg.EqualTo(iceberg.Reference("uuid_col"),
+                       "102cb62f-e6f8-4eb0-9973-d9b012ff0967")),
+               table.WithSelectedFields("uuid_col")).ToArrowTable(ctx)
+       s.Require().NoError(err)
+       defer results.Release()
+
+       s.True(expectedSchema.Equal(results.Schema()), "expected: %s\ngot: 
%s\n",
+               expectedSchema, results.Schema())
+
+       s.EqualValues(1, results.NumRows())
+       resultCol := results.Column(0).Data().Chunk(0).(*extensions.UUIDArray)
+       s.Equal("102cb62f-e6f8-4eb0-9973-d9b012ff0967", resultCol.ValueStr(0))
+
+       neqResults, err := tbl.Scan(table.WithRowFilter(
+               iceberg.NewAnd(
+                       iceberg.NotEqualTo(iceberg.Reference("uuid_col"),
+                               "102cb62f-e6f8-4eb0-9973-d9b012ff0967"),
+                       iceberg.NotEqualTo(iceberg.Reference("uuid_col"),
+                               "639cccce-c9d2-494a-a78c-278ab234f024"))),
+               table.WithSelectedFields("uuid_col")).ToArrowTable(ctx)
+       s.Require().NoError(err)
+       defer neqResults.Release()
+
+       s.EqualValues(3, neqResults.NumRows())
+       s.Equal([]string{"ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226",
+               "c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b",
+               "923dae77-83d6-47cd-b4b0-d383e64ee57e"}, 
getStrValues(neqResults.Column(0)))
+}
+
+func (s *ScannerSuite) TestUnpartitionedFixedTable() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(s.T(), 0)
+
+       ident := catalog.ToRestIdentifier("default", 
"test_uuid_and_fixed_unpartitioned")
+
+       tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+       s.Require().NoError(err)
+
+       ctx := compute.WithAllocator(s.ctx, mem)
+       results, err := tbl.Scan(table.WithRowFilter(
+               iceberg.EqualTo(iceberg.Reference("fixed_col"),
+                       "1234567890123456789012345")),
+               table.WithCaseSensitive(false),
+               table.WithSelectedFields("fixed_col")).ToArrowTable(ctx)
+       s.Require().NoError(err)
+       defer results.Release()
+
+       s.EqualValues(1, results.NumRows())
+       resultCol := results.Column(0).Data().Chunk(0).(*array.FixedSizeBinary)
+       s.Equal([]byte("1234567890123456789012345"), resultCol.Value(0))
+
+       results, err = tbl.Scan(table.WithRowFilter(
+               iceberg.NewAnd(
+                       iceberg.NotEqualTo(iceberg.Reference("fixed_col"), 
"1234567890123456789012345"),
+                       iceberg.NotEqualTo(iceberg.Reference("uuid_col"), 
"c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b"))),
+               table.WithCaseSensitive(false), 
table.WithSelectedFields("fixed_col")).ToArrowTable(ctx)
+       s.Require().NoError(err)
+       defer results.Release()
+
+       s.EqualValues(3, results.NumRows())
+       resultCol = results.Column(0).Data().Chunk(0).(*array.FixedSizeBinary)
+       s.Equal([]byte("1231231231231231231231231"), resultCol.Value(0))
+       resultCol = results.Column(0).Data().Chunk(1).(*array.FixedSizeBinary)
+       s.Equal([]byte("12345678901234567ass12345"), resultCol.Value(0))
+       resultCol = results.Column(0).Data().Chunk(2).(*array.FixedSizeBinary)
+       s.Equal([]byte("qweeqwwqq1231231231231111"), resultCol.Value(0))
+}
+
+func (s *ScannerSuite) TestScanTag() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(s.T(), 0)
+
+       ident := catalog.ToRestIdentifier("default", 
"test_positional_mor_deletes")
+
+       tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+       s.Require().NoError(err)
+
+       ctx := compute.WithAllocator(s.ctx, mem)
+       scan, err := tbl.Scan().UseRef("tag_12")
+       s.Require().NoError(err)
+
+       results, err := scan.ToArrowTable(ctx)
+       defer results.Release()
+
+       s.EqualValues(3, results.NumCols())
+       s.Equal([]int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12},
+               results.Column(1).Data().Chunk(0).(*array.Int32).Int32Values())
+}
+
+func (s *ScannerSuite) TestScanBranch() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(s.T(), 0)
+
+       ident := catalog.ToRestIdentifier("default", 
"test_positional_mor_deletes")
+
+       tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+       s.Require().NoError(err)
+
+       ctx := compute.WithAllocator(s.ctx, mem)
+       scan, err := tbl.Scan().UseRef("without_5")
+       s.Require().NoError(err)
+
+       results, err := scan.ToArrowTable(ctx)
+       defer results.Release()
+
+       s.EqualValues(3, results.NumCols())
+       s.Equal([]int32{1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12},
+               results.Column(1).Data().Chunk(0).(*array.Int32).Int32Values())
+}
+
+func (s *ScannerSuite) TestFilterOnNewColumn() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(s.T(), 0)
+
+       ident := catalog.ToRestIdentifier("default", "test_table_add_column")
+
+       tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+       s.Require().NoError(err)
+
+       ctx := compute.WithAllocator(s.ctx, mem)
+       results, err := tbl.Scan(table.WithRowFilter(
+               iceberg.EqualTo(iceberg.Reference("b"), "2"))).ToArrowTable(ctx)
+       s.Require().NoError(err)
+       defer results.Release()
+
+       s.EqualValues(2, results.NumCols())
+       s.EqualValues(1, results.NumRows())
+       s.Equal("2", results.Column(1).Data().Chunk(0).(*array.String).Value(0))
+
+       results, err = tbl.Scan(table.WithRowFilter(
+               iceberg.NotNull(iceberg.Reference("b")))).ToArrowTable(ctx)
+       s.Require().NoError(err)
+       defer results.Release()
+
+       s.EqualValues(2, results.NumCols())
+       s.EqualValues(1, results.NumRows())
+       s.Equal("2", results.Column(1).Data().Chunk(0).(*array.String).Value(0))
+
+       results, err = tbl.Scan(table.WithRowFilter(
+               iceberg.IsNull(iceberg.Reference("b")))).ToArrowTable(ctx)
+       s.Require().NoError(err)
+       defer results.Release()
+
+       s.EqualValues(2, results.NumCols())
+       s.EqualValues(1, results.NumRows())
+       s.False(results.Column(1).Data().Chunk(0).(*array.String).IsValid(0))
+}
+
+func TestScanner(t *testing.T) {
+       suite.Run(t, new(ScannerSuite))
 }
diff --git a/table/substrait/functions_set.yaml 
b/table/substrait/functions_set.yaml
new file mode 100644
index 0000000..05ec383
--- /dev/null
+++ b/table/substrait/functions_set.yaml
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+%YAML 1.2
+---
+scalar_functions:
+  -
+    name: "is_in"
+    description: >
+      Checks membership of a value in a list of values
+
+      Returns true or false if `needle` is found in `haystack`.
+    impls:
+      - args:
+        - name: needle
+          value: any1
+        - name: haystack
+          value: list<any1>
+        options:
+          nan_equality:
+            values: [ NAN_IS_NAN, NAN_IS_NOT_NAN ]
+        return: "boolean"
diff --git a/table/substrait/substrait.go b/table/substrait/substrait.go
new file mode 100644
index 0000000..b6078a6
--- /dev/null
+++ b/table/substrait/substrait.go
@@ -0,0 +1,381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package substrait
+
+import (
+       _ "embed"
+       "fmt"
+       "strings"
+
+       "github.com/apache/arrow-go/v18/arrow/compute/exprs"
+       "github.com/apache/iceberg-go"
+       "github.com/substrait-io/substrait-go/expr"
+       "github.com/substrait-io/substrait-go/extensions"
+       "github.com/substrait-io/substrait-go/types"
+)
+
+//go:embed functions_set.yaml
+var funcsetYAML string
+
+var (
+       collection = extensions.DefaultCollection
+       funcSetURI = 
"https://github.com/apache/iceberg-go/blob/main/table/substrait/functions_set.yaml";
+)
+
+func init() {
+       if !collection.URILoaded(funcSetURI) {
+               if err := collection.Load(funcSetURI, 
strings.NewReader(funcsetYAML)); err != nil {
+                       panic(err)
+               }
+       }
+}
+
+func NewExtensionSet() exprs.ExtensionIDSet {
+       return 
exprs.NewExtensionSetDefault(expr.NewEmptyExtensionRegistry(&collection))
+}
+
+// ConvertExpr binds the provided expression to the given schema and converts 
it to a
+// substrait expression so that it can be utilized for computation.
+func ConvertExpr(schema *iceberg.Schema, e iceberg.BooleanExpression, 
caseSensitive bool) (*expr.ExtensionRegistry, expr.Expression, error) {
+       base, err := ConvertSchema(schema)
+       if err != nil {
+               return nil, nil, err
+       }
+
+       reg := expr.NewEmptyExtensionRegistry(&extensions.DefaultCollection)
+
+       bldr := expr.ExprBuilder{Reg: reg, BaseSchema: &base.Struct}
+       b, err := iceberg.VisitExpr(e, &toSubstraitExpr{bldr: bldr, schema: 
schema,
+               caseSensitive: caseSensitive})
+       if err != nil {
+               return nil, nil, err
+       }
+
+       out, err := b.BuildExpr()
+       return &reg, out, err
+}
+
+// ConvertSchema converts an Iceberg schema to a substrait NamedStruct using
+// the appropriate types and column names.
+func ConvertSchema(schema *iceberg.Schema) (res types.NamedStruct, err error) {
+       var typ types.Type
+
+       typ, err = iceberg.Visit(schema, convertToSubstrait{})
+       if err != nil {
+               return
+       }
+
+       val := typ.(*types.StructType)
+       res.Struct = *val
+
+       res.Names = make([]string, schema.NumFields())
+       for i, f := range schema.Fields() {
+               res.Names[i] = f.Name
+       }
+
+       return
+}
+
+type convertToSubstrait struct{}
+
+func (convertToSubstrait) Schema(_ *iceberg.Schema, result types.Type) 
types.Type {
+       return result.WithNullability(types.NullabilityNullable)
+}
+
+func (convertToSubstrait) Struct(_ iceberg.StructType, results []types.Type) 
types.Type {
+       return &types.StructType{
+               Nullability: types.NullabilityUnspecified,
+               Types:       results,
+       }
+}
+
+func getNullability(required bool) types.Nullability {
+       if required {
+               return types.NullabilityRequired
+       }
+       return types.NullabilityNullable
+}
+
+func (convertToSubstrait) Field(field iceberg.NestedField, result types.Type) 
types.Type {
+       return result.WithNullability(getNullability(field.Required))
+}
+
+func (c convertToSubstrait) List(list iceberg.ListType, elemResult types.Type) 
types.Type {
+       return &types.ListType{
+               Nullability: types.NullabilityUnspecified,
+               Type:        c.Field(list.ElementField(), elemResult),
+       }
+}
+
+func (c convertToSubstrait) Map(m iceberg.MapType, keyResult, valResult 
types.Type) types.Type {
+       return &types.MapType{
+               Nullability: types.NullabilityUnspecified,
+               Key:         c.Field(m.KeyField(), keyResult),
+               Value:       c.Field(m.ValueField(), valResult),
+       }
+}
+
+func (convertToSubstrait) Primitive(iceberg.PrimitiveType) types.Type { 
panic("should not be called") }
+
+func (convertToSubstrait) VisitFixed(f iceberg.FixedType) types.Type {
+       return &types.FixedBinaryType{Length: int32(f.Len())}
+}
+
+func (convertToSubstrait) VisitDecimal(d iceberg.DecimalType) types.Type {
+       return &types.DecimalType{Precision: int32(d.Precision()), Scale: 
int32(d.Scale())}
+}
+
+func (convertToSubstrait) VisitBoolean() types.Type     { return 
&types.BooleanType{} }
+func (convertToSubstrait) VisitInt32() types.Type       { return 
&types.Int32Type{} }
+func (convertToSubstrait) VisitInt64() types.Type       { return 
&types.Int64Type{} }
+func (convertToSubstrait) VisitFloat32() types.Type     { return 
&types.Float32Type{} }
+func (convertToSubstrait) VisitFloat64() types.Type     { return 
&types.Float64Type{} }
+func (convertToSubstrait) VisitDate() types.Type        { return 
&types.DateType{} }
+func (convertToSubstrait) VisitTime() types.Type        { return 
&types.TimeType{} }
+func (convertToSubstrait) VisitTimestamp() types.Type   { return 
&types.TimestampType{} }
+func (convertToSubstrait) VisitTimestampTz() types.Type { return 
&types.TimestampTzType{} }
+func (convertToSubstrait) VisitString() types.Type      { return 
&types.StringType{} }
+func (convertToSubstrait) VisitBinary() types.Type      { return 
&types.BinaryType{} }
+func (convertToSubstrait) VisitUUID() types.Type        { return 
&types.UUIDType{} }
+
+var (
+       _ iceberg.SchemaVisitorPerPrimitiveType[types.Type] = 
(*convertToSubstrait)(nil)
+)
+
+var (
+       boolURI    = extensions.SubstraitDefaultURIPrefix + 
"functions_boolean.yaml"
+       compareURI = extensions.SubstraitDefaultURIPrefix + 
"functions_comparison.yaml"
+       stringURI  = extensions.SubstraitDefaultURIPrefix + 
"functions_string.yaml"
+
+       notID          = extensions.ID{URI: boolURI, Name: "not"}
+       andID          = extensions.ID{URI: boolURI, Name: "and"}
+       orID           = extensions.ID{URI: boolURI, Name: "or"}
+       isNaNID        = extensions.ID{URI: compareURI, Name: "is_nan"}
+       isNullID       = extensions.ID{URI: compareURI, Name: "is_null"}
+       isNotNullID    = extensions.ID{URI: compareURI, Name: "is_not_null"}
+       equalID        = extensions.ID{URI: compareURI, Name: "equal"}
+       notEqualID     = extensions.ID{URI: compareURI, Name: "not_equal"}
+       greaterEqualID = extensions.ID{URI: compareURI, Name: "gte"}
+       greaterID      = extensions.ID{URI: compareURI, Name: "gt"}
+       lessEqualID    = extensions.ID{URI: compareURI, Name: "lte"}
+       lessID         = extensions.ID{URI: compareURI, Name: "lt"}
+       startsWithID   = extensions.ID{URI: stringURI, Name: "starts_with"}
+       isInID         = extensions.ID{URI: funcSetURI, Name: "is_in"}
+)
+
+type toSubstraitExpr struct {
+       schema        *iceberg.Schema
+       bldr          expr.ExprBuilder
+       caseSensitive bool
+}
+
+func (t *toSubstraitExpr) VisitTrue() expr.Builder {
+       return t.bldr.Wrap(expr.NewLiteral(true, false))
+}
+
+func (t *toSubstraitExpr) VisitFalse() expr.Builder {
+       return t.bldr.Wrap(expr.NewLiteral(false, false))
+}
+
+func (t *toSubstraitExpr) VisitNot(child expr.Builder) expr.Builder {
+       return t.bldr.ScalarFunc(notID).Args(child.(expr.FuncArgBuilder))
+}
+
+func (t *toSubstraitExpr) VisitAnd(left, right expr.Builder) expr.Builder {
+       return t.bldr.ScalarFunc(andID).Args(left.(expr.FuncArgBuilder),
+               right.(expr.FuncArgBuilder))
+}
+
+func (t *toSubstraitExpr) VisitOr(left, right expr.Builder) expr.Builder {
+       return t.bldr.ScalarFunc(orID).Args(left.(expr.FuncArgBuilder),
+               right.(expr.FuncArgBuilder))
+}
+
+func (t *toSubstraitExpr) VisitUnbound(iceberg.UnboundPredicate) expr.Builder {
+       panic("can only convert bound expressions to substrait")
+}
+
+func (t *toSubstraitExpr) VisitBound(pred iceberg.BoundPredicate) expr.Builder 
{
+       return iceberg.VisitBoundPredicate(pred, t)
+}
+
+type substraitPrimitiveLiteralTypes interface {
+       bool | ~int32 | ~int64 | float32 | float64 | string
+}
+
+func toPrimitiveSubstraitLiteral[T substraitPrimitiveLiteralTypes](v T) 
expr.Literal {
+       return expr.NewPrimitiveLiteral(v, false)
+}
+
+func toByteSliceSubstraitLiteral[T []byte | types.UUID](v T) expr.Literal {
+       return expr.NewByteSliceLiteral(v, false)
+}
+
+func toDecimalLiteral(v iceberg.DecimalLiteral) expr.Literal {
+       byts, _ := v.MarshalBinary()
+       result, _ := expr.NewLiteral(&types.Decimal{
+               Scale:     int32(v.Scale),
+               Value:     byts,
+               Precision: int32(v.Type().(*iceberg.DecimalType).Precision()),
+       }, false)
+       return result
+}
+
+func toFixedLiteral(v iceberg.FixedLiteral) expr.Literal {
+       return expr.NewFixedBinaryLiteral(types.FixedBinary(v), false)
+}
+
+func toSubstraitLiteral(typ iceberg.Type, lit iceberg.Literal) expr.Literal {
+       switch lit := lit.(type) {
+       case iceberg.BoolLiteral:
+               return toPrimitiveSubstraitLiteral(bool(lit))
+       case iceberg.Int32Literal:
+               return toPrimitiveSubstraitLiteral(int32(lit))
+       case iceberg.Int64Literal:
+               return toPrimitiveSubstraitLiteral(int64(lit))
+       case iceberg.Float32Literal:
+               return toPrimitiveSubstraitLiteral(float32(lit))
+       case iceberg.Float64Literal:
+               return toPrimitiveSubstraitLiteral(float64(lit))
+       case iceberg.StringLiteral:
+               return toPrimitiveSubstraitLiteral(string(lit))
+       case iceberg.TimestampLiteral:
+               if typ.Equals(iceberg.PrimitiveTypes.TimestampTz) {
+                       return 
toPrimitiveSubstraitLiteral(types.TimestampTz(lit))
+               }
+               return toPrimitiveSubstraitLiteral(types.Timestamp(lit))
+       case iceberg.DateLiteral:
+               return toPrimitiveSubstraitLiteral(types.Date(lit))
+       case iceberg.TimeLiteral:
+               return toPrimitiveSubstraitLiteral(types.Time(lit))
+       case iceberg.BinaryLiteral:
+               return toByteSliceSubstraitLiteral([]byte(lit))
+       case iceberg.FixedLiteral:
+               return toFixedLiteral(lit)
+       case iceberg.UUIDLiteral:
+               return toByteSliceSubstraitLiteral(types.UUID(lit[:]))
+       case iceberg.DecimalLiteral:
+               return toDecimalLiteral(lit)
+       }
+       panic(fmt.Errorf("invalid literal type: %s", lit.Type()))
+}
+
+func toSubstraitLiteralSet(typ iceberg.Type, lits []iceberg.Literal) 
expr.ListLiteralValue {
+       if len(lits) == 0 {
+               return nil
+       }
+
+       out := make([]expr.Literal, len(lits))
+       for i, l := range lits {
+               out[i] = toSubstraitLiteral(typ, l)
+       }
+       return out
+}
+
+func (t *toSubstraitExpr) getRef(ref iceberg.BoundReference) expr.Reference {
+       updatedRef, err := iceberg.Reference(ref.Field().Name).Bind(t.schema, 
t.caseSensitive)
+       if err != nil {
+               panic(err)
+       }
+
+       path := updatedRef.Ref().PosPath()
+       out := expr.NewStructFieldRef(int32(path[0]))
+       if len(path) == 1 {
+               return out
+       }
+
+       cur := out
+       for _, p := range path[1:] {
+               next := expr.NewStructFieldRef(int32(p))
+               cur.Child, cur = next, next
+       }
+       return out
+}
+
+func (t *toSubstraitExpr) makeSetFunc(id extensions.ID, term 
iceberg.BoundTerm, lits iceberg.Set[iceberg.Literal]) expr.Builder {
+       val := toSubstraitLiteralSet(term.Type(), lits.Members())
+       return t.bldr.ScalarFunc(id).Args(t.bldr.RootRef(t.getRef(term.Ref())),
+               t.bldr.Literal(expr.NewNestedLiteral(val, false)))
+}
+
+func (t *toSubstraitExpr) VisitIn(term iceberg.BoundTerm, lits 
iceberg.Set[iceberg.Literal]) expr.Builder {
+       return t.makeSetFunc(isInID, term, lits)
+}
+
+func (t *toSubstraitExpr) VisitNotIn(term iceberg.BoundTerm, lits 
iceberg.Set[iceberg.Literal]) expr.Builder {
+       return t.bldr.ScalarFunc(notID).Args(t.makeSetFunc(isInID, term, 
lits).(expr.FuncArgBuilder))
+}
+
+func (t *toSubstraitExpr) makeRefFunc(id extensions.ID, term 
iceberg.BoundTerm) expr.Builder {
+       return t.bldr.ScalarFunc(id).Args(t.bldr.RootRef(t.getRef(term.Ref())))
+}
+
+func (t *toSubstraitExpr) VisitIsNan(term iceberg.BoundTerm) expr.Builder {
+       return t.makeRefFunc(isNaNID, term)
+}
+
+func (t *toSubstraitExpr) VisitNotNan(term iceberg.BoundTerm) expr.Builder {
+       return t.bldr.ScalarFunc(notID).Args(
+               t.makeRefFunc(isNaNID, term).(expr.FuncArgBuilder))
+}
+
+func (t *toSubstraitExpr) VisitIsNull(term iceberg.BoundTerm) expr.Builder {
+       return t.makeRefFunc(isNullID, term)
+}
+
+func (t *toSubstraitExpr) VisitNotNull(term iceberg.BoundTerm) expr.Builder {
+       return t.makeRefFunc(isNotNullID, term)
+}
+
+func (t *toSubstraitExpr) makeLitFunc(id extensions.ID, term 
iceberg.BoundTerm, lit iceberg.Literal) expr.Builder {
+       return t.bldr.ScalarFunc(id).Args(t.bldr.RootRef(t.getRef(term.Ref())),
+               t.bldr.Literal(toSubstraitLiteral(term.Type(), lit)))
+}
+
+func (t *toSubstraitExpr) VisitEqual(term iceberg.BoundTerm, lit 
iceberg.Literal) expr.Builder {
+       return t.makeLitFunc(equalID, term, lit)
+}
+
+func (t *toSubstraitExpr) VisitNotEqual(term iceberg.BoundTerm, lit 
iceberg.Literal) expr.Builder {
+       return t.makeLitFunc(notEqualID, term, lit)
+}
+
+func (t *toSubstraitExpr) VisitGreaterEqual(term iceberg.BoundTerm, lit 
iceberg.Literal) expr.Builder {
+       return t.makeLitFunc(greaterEqualID, term, lit)
+}
+
+func (t *toSubstraitExpr) VisitGreater(term iceberg.BoundTerm, lit 
iceberg.Literal) expr.Builder {
+       return t.makeLitFunc(greaterID, term, lit)
+}
+
+func (t *toSubstraitExpr) VisitLessEqual(term iceberg.BoundTerm, lit 
iceberg.Literal) expr.Builder {
+       return t.makeLitFunc(lessEqualID, term, lit)
+}
+
+func (t *toSubstraitExpr) VisitLess(term iceberg.BoundTerm, lit 
iceberg.Literal) expr.Builder {
+       return t.makeLitFunc(lessID, term, lit)
+}
+
+func (t *toSubstraitExpr) VisitStartsWith(term iceberg.BoundTerm, lit 
iceberg.Literal) expr.Builder {
+       return t.makeLitFunc(startsWithID, term, lit)
+}
+
+func (t *toSubstraitExpr) VisitNotStartsWith(term iceberg.BoundTerm, lit 
iceberg.Literal) expr.Builder {
+       return t.bldr.ScalarFunc(notID).Args(
+               t.makeLitFunc(startsWithID, term, lit).(expr.FuncArgBuilder))
+}
diff --git a/table/substrait/substrait_test.go 
b/table/substrait/substrait_test.go
new file mode 100644
index 0000000..387a5a8
--- /dev/null
+++ b/table/substrait/substrait_test.go
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package substrait_test
+
+import (
+       "fmt"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/table/substrait"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "github.com/substrait-io/substrait-go/types"
+)
+
+func TestRefTypes(t *testing.T) {
+       sc := iceberg.NewSchema(1,
+               iceberg.NestedField{ID: 1, Name: "a", Type: 
iceberg.PrimitiveTypes.Bool},
+               iceberg.NestedField{ID: 2, Name: "b", Type: 
iceberg.PrimitiveTypes.Int32},
+               iceberg.NestedField{ID: 3, Name: "c", Type: 
iceberg.PrimitiveTypes.Int64},
+               iceberg.NestedField{ID: 4, Name: "d", Type: 
iceberg.PrimitiveTypes.Float32},
+               iceberg.NestedField{ID: 5, Name: "e", Type: 
iceberg.PrimitiveTypes.Float64},
+               iceberg.NestedField{ID: 6, Name: "f", Type: 
iceberg.PrimitiveTypes.Date},
+               iceberg.NestedField{ID: 7, Name: "g", Type: 
iceberg.PrimitiveTypes.Time},
+               iceberg.NestedField{ID: 8, Name: "h", Type: 
iceberg.PrimitiveTypes.Timestamp},
+               iceberg.NestedField{ID: 9, Name: "i", Type: 
iceberg.DecimalTypeOf(9, 2)},
+               iceberg.NestedField{ID: 10, Name: "j", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 11, Name: "k", Type: 
iceberg.PrimitiveTypes.Binary},
+               iceberg.NestedField{ID: 12, Name: "l", Type: 
iceberg.PrimitiveTypes.UUID},
+               iceberg.NestedField{ID: 13, Name: "m", Type: 
iceberg.FixedTypeOf(5)})
+
+       tests := []struct {
+               name string
+               exp  types.Type
+       }{
+               {"a", &types.BooleanType{}},
+               {"b", &types.Int32Type{}},
+               {"c", &types.Int64Type{}},
+               {"d", &types.Float32Type{}},
+               {"e", &types.Float64Type{}},
+               {"f", &types.DateType{}},
+               {"g", &types.TimeType{}},
+               {"h", &types.TimestampType{}},
+               {"i", &types.DecimalType{Scale: 2, Precision: 9}},
+               {"j", &types.StringType{}},
+               {"k", &types.BinaryType{}},
+               {"l", &types.UUIDType{}},
+               {"m", &types.FixedBinaryType{Length: 5}},
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       ref, err := 
iceberg.IsNull(iceberg.Reference(tt.name)).Bind(sc, true)
+                       require.NoError(t, err)
+
+                       idx := ref.(iceberg.BoundPredicate).Term().Ref().Pos()
+
+                       _, converted, err := substrait.ConvertExpr(sc, ref, 
true)
+                       require.NoError(t, err)
+
+                       assert.Equal(t, fmt.Sprintf("is_null(.field(%d) => %s) 
=> boolean", idx,
+                               
tt.exp.WithNullability(types.NullabilityNullable)), converted.String())
+               })
+       }
+}
+
+var (
+       tableSchemaSimple = iceberg.NewSchemaWithIdentifiers(1,
+               []int{2},
+               iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 2, Name: "bar", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+               iceberg.NestedField{ID: 3, Name: "baz", Type: 
iceberg.PrimitiveTypes.Bool},
+       )
+
+       doubleSchema = iceberg.NewSchema(1,
+               iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.PrimitiveTypes.Float64})
+)
+
+func TestExprs(t *testing.T) {
+       ref := iceberg.Reference("foo")
+
+       tests := []struct {
+               e        iceberg.BooleanExpression
+               schema   *iceberg.Schema
+               expected string
+       }{
+               {iceberg.IsNull(ref), tableSchemaSimple, "is_null(.field(0) => 
string?) => boolean"},
+               {iceberg.NotNull(ref), tableSchemaSimple, 
"is_not_null(.field(0) => string?) => boolean"},
+               {iceberg.IsNaN(ref), doubleSchema, "is_nan(.field(0) => fp64?) 
=> boolean?"},
+               {iceberg.NotNaN(ref), doubleSchema, "not(is_nan(.field(0) => 
fp64?) => boolean?) => boolean?"},
+               {iceberg.EqualTo(ref, "hello"), tableSchemaSimple, 
"equal(.field(0) => string?, string(hello)) => boolean?"},
+               {iceberg.NotEqualTo(ref, "hello"), tableSchemaSimple, 
"not_equal(.field(0) => string?, string(hello)) => boolean?"},
+               {iceberg.GreaterThanEqual(ref, "hello"), tableSchemaSimple, 
"gte(.field(0) => string?, string(hello)) => boolean?"},
+               {iceberg.GreaterThan(ref, "hello"), tableSchemaSimple, 
"gt(.field(0) => string?, string(hello)) => boolean?"},
+               {iceberg.LessThanEqual(ref, "hello"), tableSchemaSimple, 
"lte(.field(0) => string?, string(hello)) => boolean?"},
+               {iceberg.LessThan(ref, "hello"), tableSchemaSimple, 
"lt(.field(0) => string?, string(hello)) => boolean?"},
+               {iceberg.StartsWith(ref, "he"), tableSchemaSimple, 
"starts_with(.field(0) => string?, string(he)) => boolean?"},
+               {iceberg.NotStartsWith(ref, "he"), tableSchemaSimple, 
"not(starts_with(.field(0) => string?, string(he)) => boolean?) => boolean?"},
+               {iceberg.NewAnd(iceberg.EqualTo(ref, "hello"), 
iceberg.IsNull(ref)), tableSchemaSimple,
+                       "and(equal(.field(0) => string?, string(hello)) => 
boolean?, is_null(.field(0) => string?) => boolean) => boolean?"},
+               {iceberg.NewOr(iceberg.EqualTo(ref, "hello"), 
iceberg.IsNull(ref)), tableSchemaSimple,
+                       "or(equal(.field(0) => string?, string(hello)) => 
boolean?, is_null(.field(0) => string?) => boolean) => boolean?"},
+               {iceberg.NewNot(iceberg.EqualTo(ref, "hello")), 
tableSchemaSimple,
+                       "not(equal(.field(0) => string?, string(hello)) => 
boolean?) => boolean?"},
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.e.String(), func(t *testing.T) {
+                       bound, err := iceberg.BindExpr(tt.schema, tt.e, false)
+                       require.NoError(t, err)
+
+                       _, result, err := substrait.ConvertExpr(tt.schema, 
bound, true)
+                       require.NoError(t, err)
+                       assert.Equal(t, tt.expected, result.String())
+               })
+       }
+}
diff --git a/table/table.go b/table/table.go
index 7064add..350782a 100644
--- a/table/table.go
+++ b/table/table.go
@@ -60,17 +60,78 @@ func (t Table) Schemas() map[int]*iceberg.Schema {
        return m
 }
 
-func (t Table) Scan(rowFilter iceberg.BooleanExpression, snapshotID int64, 
caseSensitive bool, fields ...string) *Scan {
+type ScanOption func(*Scan)
+
+func noopOption(*Scan) {}
+
+func WithSelectedFields(fields ...string) ScanOption {
+       if len(fields) == 0 || slices.Contains(fields, "*") {
+               return noopOption
+       }
+
+       return func(scan *Scan) {
+               scan.selectedFields = fields
+       }
+}
+
+func WithRowFilter(e iceberg.BooleanExpression) ScanOption {
+       if e == nil || e.Equals(iceberg.AlwaysTrue{}) {
+               return noopOption
+       }
+
+       return func(scan *Scan) {
+               scan.rowFilter = e
+       }
+}
+
+func WithSnapshotID(n int64) ScanOption {
+       if n == 0 {
+               return noopOption
+       }
+
+       return func(scan *Scan) {
+               scan.snapshotID = &n
+       }
+}
+
+func WithCaseSensitive(b bool) ScanOption {
+       return func(scan *Scan) {
+               scan.caseSensitive = b
+       }
+}
+
+func WithLimit(n int64) ScanOption {
+       if n < 0 {
+               return noopOption
+       }
+
+       return func(scan *Scan) {
+               scan.limit = n
+       }
+}
+
+func WithOptions(opts iceberg.Properties) ScanOption {
+       if opts == nil {
+               return noopOption
+       }
+
+       return func(scan *Scan) {
+               scan.options = opts
+       }
+}
+
+func (t Table) Scan(opts ...ScanOption) *Scan {
        s := &Scan{
                metadata:       t.metadata,
                io:             t.fs,
-               rowFilter:      rowFilter,
-               selectedFields: fields,
-               caseSensitive:  caseSensitive,
+               rowFilter:      iceberg.AlwaysTrue{},
+               selectedFields: []string{"*"},
+               caseSensitive:  true,
+               limit:          ScanNoLimit,
        }
 
-       if snapshotID != 0 {
-               s.snapshotID = &snapshotID
+       for _, opt := range opts {
+               opt(s)
        }
 
        s.partitionFilters = newKeyDefaultMapWrapErr(s.buildPartitionProjection)
diff --git a/types.go b/types.go
index ebc8849..37723e8 100644
--- a/types.go
+++ b/types.go
@@ -36,6 +36,14 @@ var (
 
 type Properties map[string]string
 
+// Get returns the value of the key if it exists, otherwise it returns the 
default value.
+func (p Properties) Get(key, defVal string) string {
+       if v, ok := p[key]; ok {
+               return v
+       }
+       return defVal
+}
+
 // Type is an interface representing any of the available iceberg types,
 // such as primitives (int32/int64/etc.) or nested types (list/struct/map).
 type Type interface {
diff --git a/visitors.go b/visitors.go
index bb0caab..f93c75a 100644
--- a/visitors.go
+++ b/visitors.go
@@ -444,3 +444,56 @@ func (expressionFieldIDs) VisitBound(pred BoundPredicate) 
map[int]struct{} {
                pred.Ref().Field().ID: {},
        }
 }
+
+// TranslateColumnNames converts the names of columns in an expression by 
looking up
+// the field IDs in the file schema. If columns don't exist they are replaced 
with
+// AlwaysFalse or AlwaysTrue depending on the operator.
+func TranslateColumnNames(expr BooleanExpression, fileSchema *Schema) 
(BooleanExpression, error) {
+       return VisitExpr(expr, columnNameTranslator{fileSchema: fileSchema})
+}
+
+type columnNameTranslator struct {
+       fileSchema *Schema
+}
+
+func (columnNameTranslator) VisitTrue() BooleanExpression  { return 
AlwaysTrue{} }
+func (columnNameTranslator) VisitFalse() BooleanExpression { return 
AlwaysFalse{} }
+func (columnNameTranslator) VisitNot(child BooleanExpression) 
BooleanExpression {
+       return NewNot(child)
+}
+
+func (columnNameTranslator) VisitAnd(left, right BooleanExpression) 
BooleanExpression {
+       return NewAnd(left, right)
+}
+
+func (columnNameTranslator) VisitOr(left, right BooleanExpression) 
BooleanExpression {
+       return NewOr(left, right)
+}
+
+func (columnNameTranslator) VisitUnbound(pred UnboundPredicate) 
BooleanExpression {
+       panic(fmt.Errorf("%w: expected bound predicate, got: %s", 
ErrInvalidArgument, pred.Term()))
+}
+
+func (c columnNameTranslator) VisitBound(pred BoundPredicate) 
BooleanExpression {
+       fileColName, found := 
c.fileSchema.FindColumnName(pred.Term().Ref().Field().ID)
+       if !found {
+               // in the case of schema evolution, the column might not be 
present
+               // in the file schema when reading older data
+               if pred.Op() == OpIsNull {
+                       return AlwaysTrue{}
+               }
+               return AlwaysFalse{}
+       }
+
+       ref := Reference(fileColName)
+       switch p := pred.(type) {
+       case BoundUnaryPredicate:
+               return p.AsUnbound(ref)
+       case BoundLiteralPredicate:
+               return p.AsUnbound(ref, p.Literal())
+       case BoundSetPredicate:
+               return p.AsUnbound(ref, p.Literals().Members())
+       default:
+               panic(fmt.Errorf("%w: unsupported predicate: %s", 
ErrNotImplemented, pred))
+       }
+}

Reply via email to