This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new ac5c84d feat(table): Initial implementation of Reading Data (#185)
ac5c84d is described below
commit ac5c84d0a6e79ad66978e3e661eedb6f49edffda
Author: Matt Topol <[email protected]>
AuthorDate: Fri Nov 8 09:47:23 2024 +0100
feat(table): Initial implementation of Reading Data (#185)
* initial sketch
* it works!
* some refactoring
* cleanup and comments
* comments and docs
* update readme
* license fix
* update arrow dep
* fix propagation of limits from UseRef
* spark sometimes writes out files differently
* remove flakey test change
* Update README.md
Co-authored-by: Fokko Driesprong <[email protected]>
* Update manifest.go
Co-authored-by: Fokko Driesprong <[email protected]>
* Update manifest.go
Co-authored-by: Fokko Driesprong <[email protected]>
* updates from feedback
* skip non-position deletes
---------
Co-authored-by: Fokko Driesprong <[email protected]>
---
README.md | 10 +-
go.mod | 14 +-
go.sum | 51 +++-
literals.go | 8 +-
manifest.go | 84 +++---
table/arrow_scanner.go | 601 +++++++++++++++++++++++++++++++++++++
table/arrow_utils.go | 16 +-
table/arrow_utils_test.go | 14 +-
table/evaluators.go | 56 ++++
table/internal/interfaces.go | 68 +++++
table/internal/parquet_files.go | 420 ++++++++++++++++++++++++++
table/internal/utils.go | 76 +++++
table/scanner.go | 172 +++++++----
table/scanner_test.go | 584 ++++++++++++++++++++++++++++++++---
table/substrait/functions_set.yaml | 36 +++
table/substrait/substrait.go | 381 +++++++++++++++++++++++
table/substrait/substrait_test.go | 132 ++++++++
table/table.go | 73 ++++-
types.go | 8 +
visitors.go | 53 ++++
20 files changed, 2690 insertions(+), 167 deletions(-)
diff --git a/README.md b/README.md
index aeb249d..58dc718 100644
--- a/README.md
+++ b/README.md
@@ -55,8 +55,8 @@ $ cd iceberg-go/cmd/iceberg && go build .
| Get Partition Specs | X |
| Get Manifests | X |
| Create New Manifests | X |
-| Plan Scan | |
-| Plan Scan for Snapshot | |
+| Plan Scan | x |
+| Plan Scan for Snapshot | x |
### Catalog Support
@@ -77,9 +77,9 @@ $ cd iceberg-go/cmd/iceberg && go build .
### Read/Write Data Support
-* No intrinsic support for reading/writing data yet
-* Data can be manually read currently by retrieving data files via Manifests.
-* Plan to add [Apache
Arrow](https://pkg.go.dev/github.com/apache/arrow/go/[email protected]) support
eventually.
+* No intrinsic support for writing data yet.
+* Plan to add [Apache Arrow](https://pkg.go.dev/github.com/apache/arrow-go/)
support eventually.
+* Data can currently be read as an Arrow Table or as a stream of Arrow record
batches.
# Get in Touch
diff --git a/go.mod b/go.mod
index 8d2f309..c41a0a7 100644
--- a/go.mod
+++ b/go.mod
@@ -22,7 +22,7 @@ go 1.23
toolchain go1.23.2
require (
- github.com/apache/arrow-go/v18 v18.0.1-0.20241022184425-56b794f52a9b
+ github.com/apache/arrow-go/v18 v18.0.1-0.20241029153821-f0c5d9939d3f
github.com/aws/aws-sdk-go-v2 v1.32.3
github.com/aws/aws-sdk-go-v2/config v1.28.1
github.com/aws/aws-sdk-go-v2/credentials v1.17.42
@@ -34,8 +34,10 @@ require (
github.com/hamba/avro/v2 v2.27.0
github.com/pterm/pterm v0.12.79
github.com/stretchr/testify v1.9.0
+ github.com/substrait-io/substrait-go v1.1.0
github.com/twmb/murmur3 v1.1.8
github.com/wolfeidau/s3iofs v1.5.2
+ golang.org/x/sync v0.8.0
gopkg.in/yaml.v3 v3.0.1
)
@@ -44,6 +46,7 @@ require (
atomicgo.dev/keyboard v0.2.9 // indirect
atomicgo.dev/schedule v0.1.0 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c //
indirect
+ github.com/alecthomas/participle/v2 v2.1.0 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/apache/thrift v0.21.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect
@@ -60,8 +63,11 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.32.3 // indirect
github.com/containerd/console v1.0.3 // indirect
+ github.com/creasty/defaults v1.8.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
+ github.com/fatih/color v1.15.0 // indirect
github.com/goccy/go-json v0.10.3 // indirect
+ github.com/goccy/go-yaml v1.11.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v24.3.25+incompatible // indirect
github.com/gookit/color v1.5.4 // indirect
@@ -70,6 +76,8 @@ require (
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/lithammer/fuzzysearch v1.1.8 // indirect
+ github.com/mattn/go-colorable v0.1.13 // indirect
+ github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 //
indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
@@ -85,10 +93,12 @@ require (
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.30.0 // indirect
- golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/tools v0.26.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
+ google.golang.org/genproto/googleapis/rpc
v0.0.0-20240903143218-8af14fe29dc1 // indirect
+ google.golang.org/grpc v1.67.1 // indirect
+ google.golang.org/protobuf v1.35.1 // indirect
)
diff --git a/go.sum b/go.sum
index e2aaddd..0ed534d 100644
--- a/go.sum
+++ b/go.sum
@@ -17,10 +17,18 @@ github.com/MarvinJWendt/testza v0.3.0/go.mod
h1:eFcL4I0idjtIx8P9C6KkAuLgATNKpX4/
github.com/MarvinJWendt/testza v0.4.2/go.mod
h1:mSdhXiKH8sg/gQehJ63bINcCKp7RtYewEjXsvsVUPbE=
github.com/MarvinJWendt/testza v0.5.2
h1:53KDo64C1z/h/d/stCYCPY69bt/OSwjq5KpFNwi+zB4=
github.com/MarvinJWendt/testza v0.5.2/go.mod
h1:xu53QFE5sCdjtMCKk8YMQ2MnymimEctc4n3EjyIYvEY=
+github.com/alecthomas/assert/v2 v2.3.0
h1:mAsH2wmvjsuvyBvAmCtm7zFsBlb8mIHx5ySLVdDZXL0=
+github.com/alecthomas/assert/v2 v2.3.0/go.mod
h1:pXcQ2Asjp247dahGEmsZ6ru0UVwnkhktn7S0bBDLxvQ=
+github.com/alecthomas/participle/v2 v2.1.0
h1:z7dElHRrOEEq45F2TG5cbQihMtNTv8vwldytDj7Wrz4=
+github.com/alecthomas/participle/v2 v2.1.0/go.mod
h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c=
+github.com/alecthomas/repr v0.2.0
h1:HAzS41CIzNW5syS8Mf9UwXhNH1J9aix/BvDRf1Ml2Yk=
+github.com/alecthomas/repr v0.2.0/go.mod
h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/andybalholm/brotli v1.1.1
h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
github.com/andybalholm/brotli v1.1.1/go.mod
h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
github.com/apache/arrow-go/v18 v18.0.1-0.20241022184425-56b794f52a9b
h1:E79+ggEd/cv9g4iLe1B8XyBhpqlvqKbCGDmf+RPSwbA=
github.com/apache/arrow-go/v18 v18.0.1-0.20241022184425-56b794f52a9b/go.mod
h1:kVPeNv6eFSRhkfWZx1BIRXU6EZnp5g2NqKsuJmKXsO8=
+github.com/apache/arrow-go/v18 v18.0.1-0.20241029153821-f0c5d9939d3f
h1:k14GhTGJuvq27vRgLxf4iuufzLt7GeN3UOytJmU7W/A=
+github.com/apache/arrow-go/v18 v18.0.1-0.20241029153821-f0c5d9939d3f/go.mod
h1:kVPeNv6eFSRhkfWZx1BIRXU6EZnp5g2NqKsuJmKXsO8=
github.com/apache/thrift v0.21.0
h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE=
github.com/apache/thrift v0.21.0/go.mod
h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw=
github.com/atomicgo/cursor v0.0.1/go.mod
h1:cBON2QmmrysudxNBFthvMtN32r3jxVRIvzkUiF/RuIk=
@@ -64,17 +72,31 @@ github.com/aws/smithy-go v1.22.0
h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM=
github.com/aws/smithy-go v1.22.0/go.mod
h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/containerd/console v1.0.3
h1:lIr7SlA5PxZyMV30bDW0MGbiOPXwc63yRuCP0ARubLw=
github.com/containerd/console v1.0.3/go.mod
h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U=
+github.com/creasty/defaults v1.8.0
h1:z27FJxCAa0JKt3utc0sCImAEb+spPucmKoOdLHvHYKk=
+github.com/creasty/defaults v1.8.0/go.mod
h1:iGzKe6pbEHnpMPtfDXZEr0NVxWnPTjb1bbDy08fPzYM=
github.com/davecgh/go-spew v1.1.0/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815
h1:bWDMxwH3px2JBh6AyO7hdCn/PkvCZXii8TGj7sbtEbQ=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod
h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
+github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
+github.com/fatih/color v1.15.0/go.mod
h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
+github.com/go-playground/locales v0.13.0
h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
+github.com/go-playground/locales v0.13.0/go.mod
h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
+github.com/go-playground/universal-translator v0.17.0
h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no=
+github.com/go-playground/universal-translator v0.17.0/go.mod
h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
+github.com/go-playground/validator/v10 v10.11.1
h1:prmOlTVv+YjZjmRmNSF3VmspqJIxJWXmqUsHwfTRRkQ=
+github.com/go-playground/validator/v10 v10.11.1/go.mod
h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU=
github.com/goccy/go-json v0.10.3
h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod
h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
+github.com/goccy/go-yaml v1.11.0
h1:n7Z+zx8S9f9KgzG6KtQKf+kwqXZlLNR2F6018Dgau54=
+github.com/goccy/go-yaml v1.11.0/go.mod
h1:H+mJrWtjPTJAHvRbV09MCK9xYwODM+wRTVFFTWckfng=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/flatbuffers v24.3.25+incompatible
h1:CX395cjN9Kke9mmalRoL3d81AtFUxJM+yDthflgJGkI=
github.com/google/flatbuffers v24.3.25+incompatible/go.mod
h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
+github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
+github.com/google/go-cmp v0.6.0/go.mod
h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -84,6 +106,8 @@ github.com/gookit/color v1.5.4
h1:FZmqs7XOyGgCAxmWyPslpiok1k05wmY3SJTytgvYFs0=
github.com/gookit/color v1.5.4/go.mod
h1:pZJOeOS8DM43rXbp4AZo1n9zCU2qjpcRko0b6/QJi9w=
github.com/hamba/avro/v2 v2.27.0
h1:IAM4lQ0VzUIKBuo4qlAiLKfqALSrFC+zi1iseTtbBKU=
github.com/hamba/avro/v2 v2.27.0/go.mod
h1:jN209lopfllfrz7IGoZErlDz+AyUJ3vrBePQFZwYf5I=
+github.com/hexops/gotextdiff v1.0.3
h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=
+github.com/hexops/gotextdiff v1.0.3/go.mod
h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/json-iterator/go v1.1.12
h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod
h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/asmfmt v1.3.2
h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
@@ -95,14 +119,22 @@ github.com/klauspost/cpuid/v2 v2.0.10/go.mod
h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuOb
github.com/klauspost/cpuid/v2 v2.0.12/go.mod
h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
github.com/klauspost/cpuid/v2 v2.2.8
h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
github.com/klauspost/cpuid/v2 v2.2.8/go.mod
h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
-github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod
h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
+github.com/kr/pretty v0.3.1/go.mod
h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod
h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod
h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/leodido/go-urn v1.2.0
h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
+github.com/leodido/go-urn v1.2.0/go.mod
h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/lithammer/fuzzysearch v1.1.8
h1:/HIuJnjHuXS8bKaiTMeeDlW2/AyIWk2brx1V8LFgLN4=
github.com/lithammer/fuzzysearch v1.1.8/go.mod
h1:IdqeyBClc3FFqSzYq/MXESsS4S0FsZ5ajtkr5xPLts4=
+github.com/mattn/go-colorable v0.1.13
h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
+github.com/mattn/go-colorable v0.1.13/go.mod
h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
+github.com/mattn/go-isatty v0.0.16/go.mod
h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
+github.com/mattn/go-isatty v0.0.19
h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
+github.com/mattn/go-isatty v0.0.19/go.mod
h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.13/go.mod
h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-runewidth v0.0.15
h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U=
github.com/mattn/go-runewidth v0.0.15/go.mod
h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
@@ -133,6 +165,8 @@ github.com/pterm/pterm v0.12.79/go.mod
h1:1v/gzOF1N0FsjbgTHZ1wVycRkKiatFvJSJC4IG
github.com/rivo/uniseg v0.2.0/go.mod
h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod
h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
+github.com/rogpeppe/go-internal v1.9.0
h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
+github.com/rogpeppe/go-internal v1.9.0/go.mod
h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
github.com/sergi/go-diff v1.2.0/go.mod
h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/stretchr/objx v0.1.0/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -144,6 +178,8 @@ github.com/stretchr/testify v1.6.1/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0
h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/substrait-io/substrait-go v1.1.0
h1:wUXoXV/ESMXgaOWu/05kvMI/UBUyhtaTRfkT5p1b5Ck=
+github.com/substrait-io/substrait-go v1.1.0/go.mod
h1:LHzL5E0VL620yw4kBQCP+sQPmxhepPTQMDJQRbOe/T4=
github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg=
github.com/twmb/murmur3 v1.1.8/go.mod
h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/wolfeidau/s3iofs v1.5.2
h1:2dmzSxdrSY29GsILVheJqBbURVQX3KglggSBtVWCYj4=
@@ -160,6 +196,8 @@ github.com/zeebo/xxh3 v1.0.2
h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod
h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod
h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
+golang.org/x/crypto v0.28.0/go.mod
h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod
h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod
h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
@@ -186,7 +224,9 @@ golang.org/x/sys v0.0.0-20211013075003-97ac67df715c/go.mod
h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@@ -214,9 +254,16 @@ golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028
h1:+cNy6SZtPcJQH3LJVLOSm
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod
h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
gonum.org/v1/gonum v0.15.1 h1:FNy7N6OUZVUaWG9pTiD+jlhdQ3lMP+/LcTpJ6+a8sQ0=
gonum.org/v1/gonum v0.15.1/go.mod
h1:eZTZuRFrzu5pcyjN5wJhcIhnUdNijYxX1T2IcrOGY0o=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1
h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
+google.golang.org/genproto/googleapis/rpc
v0.0.0-20240903143218-8af14fe29dc1/go.mod
h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
+google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
+google.golang.org/grpc v1.67.1/go.mod
h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
+google.golang.org/protobuf v1.35.1
h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
+google.golang.org/protobuf v1.35.1/go.mod
h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15
h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod
h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/literals.go b/literals.go
index 2e16d02..be4bcd9 100644
--- a/literals.go
+++ b/literals.go
@@ -151,8 +151,12 @@ func LiteralFromBytes(typ Type, data []byte) (Literal,
error) {
return v, err
case FixedType:
if len(data) != t.Len() {
- return nil, fmt.Errorf("%w: expected length %d for type
%s, got %d",
- ErrInvalidBinSerialization, t.Len(), t,
len(data))
+ // looks like some writers will write a prefix of the
fixed length
+ // for lower/upper bounds instead of the full length.
so let's pad
+ // it out to the full length if unpacking a fixed
length literal
+ padded := make([]byte, t.Len())
+ copy(padded, data)
+ data = padded
}
var v FixedLiteral
err := v.UnmarshalBinary(data)
diff --git a/manifest.go b/manifest.go
index e14d56e..700702b 100644
--- a/manifest.go
+++ b/manifest.go
@@ -363,7 +363,7 @@ func (m *manifestFileV2) FetchEntries(fs iceio.IO,
discardDeleted bool) ([]Manif
return fetchManifestEntries(m, fs, discardDeleted)
}
-func getFieldIDMap(sc avro.Schema) map[string]int {
+func getFieldIDMap(sc avro.Schema) (map[string]int, map[int]avro.LogicalType) {
getField := func(rs *avro.RecordSchema, name string) *avro.Field {
for _, f := range rs.Fields() {
if f.Name() == name {
@@ -374,19 +374,31 @@ func getFieldIDMap(sc avro.Schema) map[string]int {
}
result := make(map[string]int)
+ logicalTypes := make(map[int]avro.LogicalType)
entryField := getField(sc.(*avro.RecordSchema), "data_file")
partitionField := getField(entryField.Type().(*avro.RecordSchema),
"partition")
for _, field := range
partitionField.Type().(*avro.RecordSchema).Fields() {
if fid, ok := field.Prop("field-id").(float64); ok {
result[field.Name()] = int(fid)
+ avroTyp := field.Type()
+ if us, ok := avroTyp.(*avro.UnionSchema); ok {
+ for _, t := range us.Types() {
+ avroTyp = t
+ }
+ }
+
+ if ps, ok := avroTyp.(*avro.PrimitiveSchema); ok &&
ps.Logical() != nil {
+ logicalTypes[int(fid)] = ps.Logical().Type()
+ }
}
}
- return result
+ return result, logicalTypes
}
type hasFieldToIDMap interface {
setFieldNameToIDMap(map[string]int)
+ setFieldIDToLogicalTypeMap(map[int]avro.LogicalType)
}
func fetchManifestEntries(m ManifestFile, fs iceio.IO, discardDeleted bool)
([]ManifestEntry, error) {
@@ -407,7 +419,7 @@ func fetchManifestEntries(m ManifestFile, fs iceio.IO,
discardDeleted bool) ([]M
return nil, err
}
- fieldNameToID := getFieldIDMap(sc)
+ fieldNameToID, fieldIDToLogicalType := getFieldIDMap(sc)
isVer1, isFallback := true, false
if string(metadata["format-version"]) == "2" {
isVer1 = false
@@ -447,6 +459,7 @@ func fetchManifestEntries(m ManifestFile, fs iceio.IO,
discardDeleted bool) ([]M
tmp.inheritSeqNum(m)
if fieldToIDMap, ok :=
tmp.DataFile().(hasFieldToIDMap); ok {
fieldToIDMap.setFieldNameToIDMap(fieldNameToID)
+
fieldToIDMap.setFieldIDToLogicalTypeMap(fieldIDToLogicalType)
}
results = append(results, tmp)
}
@@ -627,52 +640,29 @@ func avroColMapToMap[K comparable, V any](c *[]colMap[K,
V]) map[K]V {
return out
}
-func avroPartitionData(input map[string]any) map[string]any {
- // hambra/avro/v2 will unmarshal a map[string]any such that
- // each entry will actually be a map[string]any with the key being
- // the avro type, not the field name.
- //
- // This means that partition data that looks like this:
- //
- // [{"field-id": 1000, "name": "ts", "type": {"type": "int",
"logicalType": "date"}}]
- //
- // Becomes:
- //
- // map[string]any{"ts": map[string]any{"int.date": time.Time{}}}
- //
- // so we need to simplify our map and make the partition data handling
easier
+func avroPartitionData(input map[string]any, nameToID map[string]int,
logicalTypes map[int]avro.LogicalType) map[string]any {
out := make(map[string]any)
for k, v := range input {
- switch v := v.(type) {
- case map[string]any:
- for typeName, val := range v {
- switch typeName {
- case "int.date":
- out[k] =
Date(val.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour *
24).Seconds()))
- case "int.time-millis":
- out[k] =
Time(val.(time.Duration).Microseconds())
- case "long.time-micros":
- out[k] =
Time(val.(time.Duration).Microseconds())
- case "long.timestamp-millis":
- out[k] =
Timestamp(val.(time.Time).UTC().UnixMicro())
- case "long.timestamp-micros":
- out[k] =
Timestamp(val.(time.Time).UTC().UnixMicro())
- case "bytes.decimal":
- // not implemented yet
- case "fixed.decimal":
- // not implemented yet
+ if id, ok := nameToID[k]; ok {
+ if logical, ok := logicalTypes[id]; ok {
+ switch logical {
+ case avro.Date:
+ out[k] =
Date(v.(time.Time).Truncate(24*time.Hour).Unix() / int64((time.Hour *
24).Seconds()))
+ case avro.TimeMillis:
+ out[k] =
Time(v.(time.Duration).Milliseconds())
+ case avro.TimeMicros:
+ out[k] =
Time(v.(time.Duration).Microseconds())
+ case avro.TimestampMillis:
+ out[k] =
Timestamp(v.(time.Time).UTC().UnixMilli())
+ case avro.TimestampMicros:
+ out[k] =
Timestamp(v.(time.Time).UTC().UnixMicro())
default:
- out[k] = val
+ out[k] = v
}
- }
- default:
- switch v := v.(type) {
- case time.Time:
- out[k] = Timestamp(v.UTC().UnixMicro())
- default:
- out[k] = v
+ continue
}
}
+ out[k] = v
}
return out
}
@@ -708,7 +698,8 @@ type dataFile struct {
// not used for anything yet, but important to maintain the information
// for future development and updates such as when we get to writes,
// and scan planning
- fieldNameToID map[string]int
+ fieldNameToID map[string]int
+ fieldIDToLogicalType map[int]avro.LogicalType
initMaps sync.Once
}
@@ -722,11 +713,14 @@ func (d *dataFile) initializeMapData() {
d.distinctCntMap = avroColMapToMap(d.DistinctCounts)
d.lowerBoundMap = avroColMapToMap(d.LowerBounds)
d.upperBoundMap = avroColMapToMap(d.UpperBounds)
- d.PartitionData = avroPartitionData(d.PartitionData)
+ d.PartitionData = avroPartitionData(d.PartitionData,
d.fieldNameToID, d.fieldIDToLogicalType)
})
}
func (d *dataFile) setFieldNameToIDMap(m map[string]int) { d.fieldNameToID = m
}
+func (d *dataFile) setFieldIDToLogicalTypeMap(m map[int]avro.LogicalType) {
+ d.fieldIDToLogicalType = m
+}
func (d *dataFile) ContentType() ManifestEntryContent { return d.Content }
func (d *dataFile) FilePath() string { return d.Path }
diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go
new file mode 100644
index 0000000..814ae77
--- /dev/null
+++ b/table/arrow_scanner.go
@@ -0,0 +1,601 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "context"
+ "io"
+ "iter"
+ "runtime"
+ "strconv"
+ "sync"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/compute"
+ "github.com/apache/arrow-go/v18/arrow/compute/exprs"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+ "github.com/apache/iceberg-go/table/internal"
+ "github.com/apache/iceberg-go/table/substrait"
+ "github.com/substrait-io/substrait-go/expr"
+ "golang.org/x/sync/errgroup"
+)
+
+const (
+ ScanOptionArrowUseLargeTypes = "arrow.use_large_types"
+)
+
+type positionDeletes = []*arrow.Chunked
+type perFilePosDeletes = map[string]positionDeletes
+
+func readAllDeleteFiles(ctx context.Context, fs iceio.IO, tasks
[]FileScanTask) (perFilePosDeletes, error) {
+ var (
+ deletesPerFile = make(perFilePosDeletes)
+ uniqueDeletes = make(map[string]iceberg.DataFile)
+ err error
+ )
+
+ for _, t := range tasks {
+ for _, d := range t.DeleteFiles {
+ if d.ContentType() != iceberg.EntryContentPosDeletes {
+ continue
+ }
+
+ if _, ok := uniqueDeletes[d.FilePath()]; !ok {
+ uniqueDeletes[d.FilePath()] = d
+ }
+ }
+ }
+
+ if len(uniqueDeletes) == 0 {
+ return deletesPerFile, nil
+ }
+
+ g, ctx := errgroup.WithContext(ctx)
+ g.SetLimit(runtime.NumCPU())
+
+ perFileChan := make(chan map[string]*arrow.Chunked, runtime.NumCPU())
+ go func() {
+ defer close(perFileChan)
+ for _, v := range uniqueDeletes {
+ g.Go(func() error {
+ deletes, err := readDeletes(ctx, fs, v)
+ if deletes != nil {
+ perFileChan <- deletes
+ }
+ return err
+ })
+ }
+
+ err = g.Wait()
+ }()
+
+ for deletes := range perFileChan {
+ for file, arr := range deletes {
+ deletesPerFile[file] = append(deletesPerFile[file], arr)
+ }
+ }
+
+ return deletesPerFile, err
+}
+
+func readDeletes(ctx context.Context, fs iceio.IO, dataFile iceberg.DataFile)
(map[string]*arrow.Chunked, error) {
+ src, err := internal.GetFile(ctx, fs, dataFile, true)
+ if err != nil {
+ return nil, err
+ }
+
+ rdr, err := src.GetReader(ctx)
+ if err != nil {
+ return nil, err
+ }
+ defer rdr.Close()
+
+ tbl, err := rdr.ReadTable(ctx)
+ if err != nil {
+ return nil, err
+ }
+ defer tbl.Release()
+
+ tbl, err = array.UnifyTableDicts(compute.GetAllocator(ctx), tbl)
+ if err != nil {
+ return nil, err
+ }
+ defer tbl.Release()
+
+ filePathCol :=
tbl.Column(tbl.Schema().FieldIndices("file_path")[0]).Data()
+ posCol := tbl.Column(tbl.Schema().FieldIndices("pos")[0]).Data()
+ dict :=
filePathCol.Chunk(0).(*array.Dictionary).Dictionary().(*array.String)
+
+ results := make(map[string]*arrow.Chunked)
+ for i := 0; i < dict.Len(); i++ {
+ v := dict.Value(i)
+
+ mask, err := compute.CallFunction(ctx, "equal", nil,
+ compute.NewDatumWithoutOwning(filePathCol),
compute.NewDatum(v))
+ if err != nil {
+ return nil, err
+ }
+ defer mask.Release()
+
+ filtered, err := compute.Filter(ctx,
compute.NewDatumWithoutOwning(posCol),
+ mask, *compute.DefaultFilterOptions())
+ if err != nil {
+ return nil, err
+ }
+
+ results[v] = filtered.(*compute.ChunkedDatum).Value
+ }
+ return results, nil
+}
+
+type set[T comparable] map[T]struct{}
+
+func combinePositionalDeletes(mem memory.Allocator, deletes set[int64], start,
end int64) arrow.Array {
+ bldr := array.NewInt64Builder(mem)
+ defer bldr.Release()
+
+ for i := start; i < end; i++ {
+ if _, ok := deletes[i]; !ok {
+ bldr.Append(i)
+ }
+ }
+ return bldr.NewArray()
+}
+
+type recProcessFn func(arrow.Record) (arrow.Record, error)
+
+func processPositionalDeletes(ctx context.Context, deletes set[int64])
recProcessFn {
+ nextIdx, mem := int64(0), compute.GetAllocator(ctx)
+ return func(r arrow.Record) (arrow.Record, error) {
+ defer r.Release()
+
+ currentIdx := nextIdx
+ nextIdx += r.NumRows()
+
+ indices := combinePositionalDeletes(mem, deletes, currentIdx,
nextIdx)
+ defer indices.Release()
+
+ out, err := compute.Take(ctx, *compute.DefaultTakeOptions(),
+ compute.NewDatumWithoutOwning(r),
compute.NewDatumWithoutOwning(indices))
+ if err != nil {
+ return nil, err
+ }
+
+ return out.(*compute.RecordDatum).Value, nil
+ }
+}
+
+func filterRecords(ctx context.Context, recordFilter expr.Expression)
recProcessFn {
+ return func(rec arrow.Record) (arrow.Record, error) {
+ defer rec.Release()
+
+ input := compute.NewDatumWithoutOwning(rec)
+ mask, err := exprs.ExecuteScalarExpression(ctx, rec.Schema(),
recordFilter, input)
+ if err != nil {
+ return nil, err
+ }
+ defer mask.Release()
+
+ result, err := compute.Filter(ctx, input, mask,
*compute.DefaultFilterOptions())
+ if err != nil {
+ return nil, err
+ }
+
+ return result.(*compute.RecordDatum).Value, nil
+ }
+}
+
+type arrowScan struct {
+ fs iceio.IO
+ metadata Metadata
+ projectedSchema *iceberg.Schema
+ boundRowFilter iceberg.BooleanExpression
+ caseSensitive bool
+ rowLimit int64
+ options iceberg.Properties
+
+ useLargeTypes bool
+}
+
+func (as *arrowScan) projectedFieldIDs() (set[int], error) {
+ idset := set[int]{}
+ for _, field := range as.projectedSchema.Fields() {
+ switch field.Type.(type) {
+ case *iceberg.MapType, *iceberg.ListType:
+ default:
+ idset[field.ID] = struct{}{}
+ }
+ }
+
+ if as.boundRowFilter != nil {
+ extracted, err := iceberg.ExtractFieldIDs(as.boundRowFilter)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, id := range extracted {
+ idset[id] = struct{}{}
+ }
+ }
+
+ return idset, nil
+}
+
+type enumeratedRecord struct {
+ Record internal.Enumerated[arrow.Record]
+ Task internal.Enumerated[FileScanTask]
+ Err error
+}
+
+func (as *arrowScan) prepareToRead(ctx context.Context, file iceberg.DataFile)
(*iceberg.Schema, []int, internal.FileReader, error) {
+ ids, err := as.projectedFieldIDs()
+ if err != nil {
+ return nil, nil, nil, err
+ }
+
+ src, err := internal.GetFile(ctx, as.fs, file, false)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+
+ rdr, err := src.GetReader(ctx)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+
+ fileSchema, colIndices, err := rdr.PrunedSchema(ids)
+ if err != nil {
+ rdr.Close()
+ return nil, nil, nil, err
+ }
+
+ iceSchema, err := ArrowSchemaToIceberg(fileSchema, false, nil)
+ if err != nil {
+ rdr.Close()
+ return nil, nil, nil, err
+ }
+
+ return iceSchema, colIndices, rdr, nil
+}
+
+func (as *arrowScan) getRecordFilter(ctx context.Context, fileSchema
*iceberg.Schema) (recProcessFn, bool, error) {
+ if as.boundRowFilter == nil ||
as.boundRowFilter.Equals(iceberg.AlwaysTrue{}) {
+ return nil, false, nil
+ }
+
+ translatedFilter, err :=
iceberg.TranslateColumnNames(as.boundRowFilter, fileSchema)
+ if err != nil {
+ return nil, false, err
+ }
+
+ if translatedFilter.Equals(iceberg.AlwaysFalse{}) {
+ return nil, true, nil
+ }
+
+ translatedFilter, err = iceberg.BindExpr(fileSchema, translatedFilter,
as.caseSensitive)
+ if err != nil {
+ return nil, false, err
+ }
+
+ if !translatedFilter.Equals(iceberg.AlwaysTrue{}) {
+ extSet, recordFilter, err := substrait.ConvertExpr(fileSchema,
translatedFilter, as.caseSensitive)
+ if err != nil {
+ return nil, false, err
+ }
+
+ ctx = exprs.WithExtensionIDSet(ctx,
exprs.NewExtensionSetDefault(*extSet))
+ return filterRecords(ctx, recordFilter), false, nil
+ }
+
+ return nil, false, nil
+}
+
+func (as *arrowScan) processRecords(
+ ctx context.Context,
+ task internal.Enumerated[FileScanTask],
+ fileSchema *iceberg.Schema,
+ rdr internal.FileReader,
+ columns []int,
+ pipeline []recProcessFn,
+ out chan<- enumeratedRecord) (err error) {
+
+ var (
+ testRowGroups any
+ recRdr array.RecordReader
+ )
+
+ switch task.Value.File.FileFormat() {
+ case iceberg.ParquetFile:
+ testRowGroups, err =
newParquetRowGroupStatsEvaluator(fileSchema, as.boundRowFilter, false)
+ if err != nil {
+ return err
+ }
+ }
+
+ recRdr, err = rdr.GetRecords(ctx, columns, testRowGroups)
+ if err != nil {
+ return err
+ }
+ defer recRdr.Release()
+
+ var (
+ idx int
+ prev arrow.Record
+ )
+
+ for recRdr.Next() {
+ if prev != nil {
+ out <- enumeratedRecord{Record:
internal.Enumerated[arrow.Record]{
+ Value: prev, Index: idx, Last: false}, Task:
task}
+ idx++
+ }
+
+ prev = recRdr.Record()
+ prev.Retain()
+
+ for _, f := range pipeline {
+ prev, err = f(prev)
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ if prev != nil {
+ out <- enumeratedRecord{Record:
internal.Enumerated[arrow.Record]{
+ Value: prev, Index: idx, Last: true}, Task: task}
+ }
+
+ if recRdr.Err() != nil && recRdr.Err() != io.EOF {
+ err = recRdr.Err()
+ }
+
+ return err
+}
+
+func (as *arrowScan) recordsFromTask(ctx context.Context, task
internal.Enumerated[FileScanTask], out chan<- enumeratedRecord,
positionalDeletes positionDeletes) (err error) {
+ defer func() {
+ if err != nil {
+ out <- enumeratedRecord{Task: task, Err: err}
+ }
+ }()
+
+ var (
+ rdr internal.FileReader
+ iceSchema *iceberg.Schema
+ colIndices []int
+ filterFunc recProcessFn
+ dropFile bool
+ )
+
+ iceSchema, colIndices, rdr, err = as.prepareToRead(ctx, task.Value.File)
+ if err != nil {
+ return
+ }
+
+ pipeline := make([]recProcessFn, 0, 2)
+ if len(positionalDeletes) > 0 {
+ deletes := set[int64]{}
+ for _, chunk := range positionalDeletes {
+ for _, a := range chunk.Chunks() {
+ for _, v := range
a.(*array.Int64).Int64Values() {
+ deletes[v] = struct{}{}
+ }
+ }
+ }
+
+ pipeline = append(pipeline, processPositionalDeletes(ctx,
deletes))
+ }
+
+ filterFunc, dropFile, err = as.getRecordFilter(ctx, iceSchema)
+ if err != nil {
+ return
+ }
+
+ if dropFile {
+ var emptySchema *arrow.Schema
+ emptySchema, err = SchemaToArrowSchema(as.projectedSchema, nil,
false, as.useLargeTypes)
+ if err != nil {
+ return err
+ }
+ out <- enumeratedRecord{Task: task, Record:
internal.Enumerated[arrow.Record]{
+ Value: array.NewRecord(emptySchema, nil, 0), Index: 0,
Last: true}}
+ return
+ }
+
+ if filterFunc != nil {
+ pipeline = append(pipeline, filterFunc)
+ }
+
+ pipeline = append(pipeline, func(r arrow.Record) (arrow.Record, error) {
+ defer r.Release()
+ return ToRequestedSchema(as.projectedSchema, iceSchema, r,
false, false, as.useLargeTypes)
+ })
+
+ err = as.processRecords(ctx, task, iceSchema, rdr, colIndices,
pipeline, out)
+ return
+}
+
+func createIterator(ctx context.Context, numWorkers uint, records <-chan
enumeratedRecord, deletesPerFile perFilePosDeletes, cancel context.CancelFunc,
rowLimit int64) iter.Seq2[arrow.Record, error] {
+ isBeforeAny := func(batch enumeratedRecord) bool {
+ return batch.Task.Index < 0
+ }
+
+ sequenced := internal.MakeSequencedChan(uint(numWorkers), records,
+ func(left, right *enumeratedRecord) bool {
+ switch {
+ case isBeforeAny(*left):
+ return true
+ case isBeforeAny(*right):
+ return false
+ case left.Err != nil || right.Err != nil:
+ return true
+ case left.Task.Index == right.Task.Index:
+ return left.Record.Index < right.Record.Index
+ default:
+ return left.Task.Index < right.Task.Index
+ }
+ }, func(prev, next *enumeratedRecord) bool {
+ switch {
+ case isBeforeAny(*prev):
+ return next.Task.Index == 0 &&
next.Record.Index == 0
+ case next.Err != nil:
+ return true
+ case prev.Task.Index == next.Task.Index:
+ return next.Record.Index == prev.Record.Index+1
+ default:
+ return next.Task.Index == prev.Task.Index+1 &&
+ prev.Record.Last && next.Record.Index
== 0
+ }
+ }, enumeratedRecord{Task:
internal.Enumerated[FileScanTask]{Index: -1}})
+
+ totalRowCount := int64(0)
+ return func(yield func(arrow.Record, error) bool) {
+ defer func() {
+ for rec := range sequenced {
+ if rec.Record.Value != nil {
+ rec.Record.Value.Release()
+ }
+ }
+
+ for _, v := range deletesPerFile {
+ for _, chunk := range v {
+ chunk.Release()
+ }
+ }
+ }()
+
+ defer cancel()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case enum, ok := <-sequenced:
+ if !ok {
+ return
+ }
+
+ if enum.Err != nil {
+ yield(nil, enum.Err)
+ return
+ }
+
+ rec := enum.Record.Value
+ if rowLimit > 0 {
+ if totalRowCount >= rowLimit {
+ rec.Release()
+ return
+ } else if totalRowCount+rec.NumRows() >
rowLimit {
+ defer rec.Release()
+ rec = rec.NewSlice(0,
rowLimit-totalRowCount)
+ }
+ }
+
+ if rec.NumRows() == 0 {
+ // skip empty records
+ continue
+ }
+
+ if !yield(rec, nil) {
+ return
+ }
+ totalRowCount += rec.NumRows()
+ if rowLimit > 0 && totalRowCount >= rowLimit {
+ return
+ }
+ }
+ }
+ }
+}
+
+func (as *arrowScan) recordBatchesFromTasksAndDeletes(ctx context.Context,
tasks []FileScanTask, deletesPerFile perFilePosDeletes) iter.Seq2[arrow.Record,
error] {
+ extSet := substrait.NewExtensionSet()
+
+ ctx, cancel := context.WithCancel(exprs.WithExtensionIDSet(ctx, extSet))
+ taskChan := make(chan internal.Enumerated[FileScanTask], len(tasks))
+
+ // numWorkers := 1
+ numWorkers := min(runtime.NumCPU(), len(tasks))
+ records := make(chan enumeratedRecord, numWorkers)
+
+ var wg sync.WaitGroup
+ wg.Add(numWorkers)
+ for i := 0; i < numWorkers; i++ {
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case task, ok := <-taskChan:
+ if !ok {
+ return
+ }
+
+ if err := as.recordsFromTask(ctx, task,
records,
+
deletesPerFile[task.Value.File.FilePath()]); err != nil {
+ cancel()
+ return
+ }
+ }
+ }
+ }()
+ }
+
+ go func() {
+ for i, t := range tasks {
+ taskChan <- internal.Enumerated[FileScanTask]{
+ Value: t, Index: i, Last: i == len(tasks)-1}
+ }
+ close(taskChan)
+
+ wg.Wait()
+ close(records)
+ }()
+
+ return createIterator(ctx, uint(numWorkers), records, deletesPerFile,
+ cancel, as.rowLimit)
+}
+
+func (as *arrowScan) GetRecords(ctx context.Context, tasks []FileScanTask)
(*arrow.Schema, iter.Seq2[arrow.Record, error], error) {
+ var err error
+ as.useLargeTypes, err =
strconv.ParseBool(as.options.Get(ScanOptionArrowUseLargeTypes, "false"))
+ if err != nil {
+ as.useLargeTypes = false
+ }
+
+ resultSchema, err := SchemaToArrowSchema(as.projectedSchema, nil,
false, as.useLargeTypes)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ if as.rowLimit == 0 {
+ return resultSchema, func(yield func(arrow.Record, error) bool)
{}, nil
+ }
+
+ deletesPerFile, err := readAllDeleteFiles(ctx, as.fs, tasks)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return resultSchema, as.recordBatchesFromTasksAndDeletes(ctx, tasks,
deletesPerFile), nil
+}
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index b44d06f..310fbfb 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -465,6 +465,7 @@ func ensureSmallArrowTypes(dt arrow.DataType)
(arrow.DataType, error) {
type convertToArrow struct {
metadata map[string]string
includeFieldIDs bool
+ useLargeTypes bool
}
func (c convertToArrow) Schema(_ *iceberg.Schema, result arrow.Field)
arrow.Field {
@@ -554,11 +555,17 @@ func (c convertToArrow) VisitTimestamp() arrow.Field {
}
func (c convertToArrow) VisitString() arrow.Field {
- return arrow.Field{Type: arrow.BinaryTypes.LargeString}
+ if c.useLargeTypes {
+ return arrow.Field{Type: arrow.BinaryTypes.LargeString}
+ }
+ return arrow.Field{Type: arrow.BinaryTypes.String}
}
func (c convertToArrow) VisitBinary() arrow.Field {
- return arrow.Field{Type: arrow.BinaryTypes.LargeBinary}
+ if c.useLargeTypes {
+ return arrow.Field{Type: arrow.BinaryTypes.LargeBinary}
+ }
+ return arrow.Field{Type: arrow.BinaryTypes.Binary}
}
func (c convertToArrow) VisitUUID() arrow.Field {
@@ -569,8 +576,9 @@ func (c convertToArrow) VisitUUID() arrow.Field {
// is non-nil, it will be included as the top-level metadata in the schema. If
includeFieldIDs
// is true, then each field of the schema will contain a metadata key
PARQUET:field_id set to
// the field id from the iceberg schema.
-func SchemaToArrowSchema(sc *iceberg.Schema, metadata map[string]string,
includeFieldIDs bool) (*arrow.Schema, error) {
- top, err := iceberg.Visit(sc, convertToArrow{metadata: metadata,
includeFieldIDs: includeFieldIDs})
+func SchemaToArrowSchema(sc *iceberg.Schema, metadata map[string]string,
includeFieldIDs, useLargeTypes bool) (*arrow.Schema, error) {
+ top, err := iceberg.Visit(sc, convertToArrow{metadata: metadata,
+ includeFieldIDs: includeFieldIDs, useLargeTypes: useLargeTypes})
if err != nil {
return nil, err
}
diff --git a/table/arrow_utils_test.go b/table/arrow_utils_test.go
index 9c15985..4dc12f2 100644
--- a/table/arrow_utils_test.go
+++ b/table/arrow_utils_test.go
@@ -76,16 +76,16 @@ func TestArrowToIceberg(t *testing.T) {
{&arrow.TimestampType{Unit: arrow.Microsecond},
iceberg.PrimitiveTypes.Timestamp, true, ""},
{&arrow.TimestampType{Unit: arrow.Nanosecond}, nil, false,
"'ns' timestamp precision not supported"},
{&arrow.TimestampType{Unit: arrow.Microsecond, TimeZone:
"US/Pacific"}, nil, false, "unsupported arrow type for conversion -
timestamp[us, tz=US/Pacific]"},
- {arrow.BinaryTypes.String, iceberg.PrimitiveTypes.String,
false, ""},
- {arrow.BinaryTypes.LargeString, iceberg.PrimitiveTypes.String,
true, ""},
+ {arrow.BinaryTypes.String, iceberg.PrimitiveTypes.String, true,
""},
+ {arrow.BinaryTypes.LargeString, iceberg.PrimitiveTypes.String,
false, ""},
{arrow.BinaryTypes.StringView, nil, false, "unsupported arrow
type for conversion - string_view"},
- {arrow.BinaryTypes.Binary, iceberg.PrimitiveTypes.Binary,
false, ""},
- {arrow.BinaryTypes.LargeBinary, iceberg.PrimitiveTypes.Binary,
true, ""},
+ {arrow.BinaryTypes.Binary, iceberg.PrimitiveTypes.Binary, true,
""},
+ {arrow.BinaryTypes.LargeBinary, iceberg.PrimitiveTypes.Binary,
false, ""},
{arrow.BinaryTypes.BinaryView, nil, false, "unsupported arrow
type for conversion - binary_view"},
{extensions.NewUUIDType(), iceberg.PrimitiveTypes.UUID, true,
""},
{arrow.StructOf(arrow.Field{
Name: "foo",
- Type: arrow.BinaryTypes.LargeString,
+ Type: arrow.BinaryTypes.String,
Nullable: true,
Metadata: arrow.MetadataFrom(map[string]string{
table.ArrowParquetFieldIDKey: "1",
table.ArrowFieldDocKey: "foo doc",
@@ -137,7 +137,7 @@ func TestArrowToIceberg(t *testing.T) {
}, false, ""},
{arrow.MapOfWithMetadata(arrow.PrimitiveTypes.Int32,
fieldIDMeta("1"),
- arrow.BinaryTypes.LargeString, fieldIDMeta("2")),
+ arrow.BinaryTypes.String, fieldIDMeta("2")),
&iceberg.MapType{
KeyID: 1, KeyType: iceberg.PrimitiveTypes.Int32,
ValueID: 2, ValueType:
iceberg.PrimitiveTypes.String, ValueRequired: false,
@@ -311,7 +311,7 @@ func TestArrowSchemaRoundTripConversion(t *testing.T) {
}
for _, tt := range schemas {
- sc, err := table.SchemaToArrowSchema(tt, nil, true)
+ sc, err := table.SchemaToArrowSchema(tt, nil, true, false)
require.NoError(t, err)
ice, err := table.ArrowSchemaToIceberg(sc, false, nil)
diff --git a/table/evaluators.go b/table/evaluators.go
index 1458c98..e8b6bda 100644
--- a/table/evaluators.go
+++ b/table/evaluators.go
@@ -22,6 +22,7 @@ import (
"math"
"slices"
+ "github.com/apache/arrow-go/v18/parquet/metadata"
"github.com/apache/iceberg-go"
"github.com/google/uuid"
)
@@ -682,6 +683,21 @@ func newInclusiveMetricsEvaluator(s *iceberg.Schema, expr
iceberg.BooleanExpress
}).Eval, nil
}
+func newParquetRowGroupStatsEvaluator(fileSchema *iceberg.Schema, expr
iceberg.BooleanExpression,
+ includeEmptyFiles bool) (func(*metadata.RowGroupMetaData, []int) (bool,
error), error) {
+
+ rewritten, err := iceberg.RewriteNotExpr(expr)
+ if err != nil {
+ return nil, err
+ }
+
+ return (&inclusiveMetricsEval{
+ st: fileSchema.AsStruct(),
+ includeEmptyFiles: includeEmptyFiles,
+ expr: rewritten,
+ }).TestRowGroup, nil
+}
+
type inclusiveMetricsEval struct {
metricsEvaluator
@@ -690,6 +706,46 @@ type inclusiveMetricsEval struct {
includeEmptyFiles bool
}
+func (m *inclusiveMetricsEval) TestRowGroup(rgmeta *metadata.RowGroupMetaData,
colIndices []int) (bool, error) {
+ if !m.includeEmptyFiles && rgmeta.NumRows() == 0 {
+ return rowsCannotMatch, nil
+ }
+
+ m.valueCounts = make(map[int]int64)
+ m.nullCounts = make(map[int]int64)
+ m.nanCounts = nil
+ m.lowerBounds = make(map[int][]byte)
+ m.upperBounds = make(map[int][]byte)
+
+ for _, c := range colIndices {
+ colMeta, err := rgmeta.ColumnChunk(c)
+ if err != nil {
+ return false, err
+ }
+
+ if ok, err := colMeta.StatsSet(); !ok || err != nil {
+ continue
+ }
+
+ stats, err := colMeta.Statistics()
+ if err != nil {
+ return false, err
+ }
+
+ fieldID := int(stats.Descr().SchemaNode().FieldID())
+ m.valueCounts[fieldID] = stats.NumValues()
+ if stats.HasNullCount() {
+ m.nullCounts[fieldID] = stats.NullCount()
+ }
+ if stats.HasMinMax() {
+ m.lowerBounds[fieldID] = stats.EncodeMin()
+ m.upperBounds[fieldID] = stats.EncodeMax()
+ }
+ }
+
+ return iceberg.VisitExpr(m.expr, m)
+}
+
func (m *inclusiveMetricsEval) Eval(file iceberg.DataFile) (bool, error) {
if !m.includeEmptyFiles && file.Count() == 0 {
return rowsCannotMatch, nil
diff --git a/table/internal/interfaces.go b/table/internal/interfaces.go
new file mode 100644
index 0000000..9d837fe
--- /dev/null
+++ b/table/internal/interfaces.go
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/compute"
+ "github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+)
+
+// GetFile opens the given file using the provided file system.
+//
+// The FileSource interface allows abstracting away the underlying file format
+// while providing utilties to read the file as Arrow record batches.
+func GetFile(ctx context.Context, fs iceio.IO, dataFile iceberg.DataFile,
isPosDeletes bool) (FileSource, error) {
+ switch dataFile.FileFormat() {
+ case iceberg.ParquetFile:
+ return &ParquetFileSource{
+ mem: compute.GetAllocator(ctx),
+ fs: fs,
+ file: dataFile,
+ }, nil
+ default:
+ return nil, fmt.Errorf("%w: only parquet format is implemented,
got %s",
+ iceberg.ErrNotImplemented, dataFile.FileFormat())
+ }
+}
+
+type FileSource interface {
+ GetReader(context.Context) (FileReader, error)
+}
+
+type FileReader interface {
+ io.Closer
+
+ // PrunedSchema takes in the list of projected field IDs and returns
the arrow schema
+ // that represents the underlying file schema with only the projected
fields. It also
+ // returns the indexes of the projected columns to allow reading *only*
the needed
+ // columns.
+ PrunedSchema(projectedIDs map[int]struct{}) (*arrow.Schema, []int,
error)
+ // GetRecords returns a record reader for only the provided columns
(using nil will read
+ // all of the columns of the underlying file.) The `tester` is a
function that can be used,
+ // if non-nil, to filter aspects of the file such as skipping row
groups in a parquet file.
+ GetRecords(ctx context.Context, cols []int, tester any)
(array.RecordReader, error)
+ // ReadTable reads the entire file and returns it as an arrow table.
+ ReadTable(context.Context) (arrow.Table, error)
+}
diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go
new file mode 100644
index 0000000..1c5d38e
--- /dev/null
+++ b/table/internal/parquet_files.go
@@ -0,0 +1,420 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+ "context"
+ "fmt"
+ "slices"
+ "strconv"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/parquet"
+ "github.com/apache/arrow-go/v18/parquet/file"
+ "github.com/apache/arrow-go/v18/parquet/metadata"
+ "github.com/apache/arrow-go/v18/parquet/pqarrow"
+ "github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+)
+
+type ParquetFileSource struct {
+ mem memory.Allocator
+ fs iceio.IO
+ file iceberg.DataFile
+}
+
+type wrapPqArrowReader struct {
+ *pqarrow.FileReader
+}
+
+func (w wrapPqArrowReader) Close() error {
+ return w.ParquetReader().Close()
+}
+
+func (w wrapPqArrowReader) PrunedSchema(projectedIDs map[int]struct{})
(*arrow.Schema, []int, error) {
+ return pruneParquetColumns(w.Manifest, projectedIDs, false)
+}
+
+func (w wrapPqArrowReader) GetRecords(ctx context.Context, cols []int, tester
any) (array.RecordReader, error) {
+ var (
+ testRg func(*metadata.RowGroupMetaData, []int) (bool, error)
+ ok bool
+ )
+
+ if tester != nil {
+ testRg, ok = tester.(func(*metadata.RowGroupMetaData, []int)
(bool, error))
+ if !ok {
+ return nil, fmt.Errorf("%w: invalid tester function",
iceberg.ErrInvalidArgument)
+ }
+ }
+
+ var rgList []int
+ if testRg != nil {
+ rgList = make([]int, 0)
+ fileMeta, numRg := w.ParquetReader().MetaData(),
w.ParquetReader().NumRowGroups()
+ for rg := 0; rg < numRg; rg++ {
+ rgMeta := fileMeta.RowGroup(rg)
+ use, err := testRg(rgMeta, cols)
+ if err != nil {
+ return nil, err
+ }
+
+ if use {
+ rgList = append(rgList, rg)
+ }
+ }
+ }
+
+ return w.GetRecordReader(ctx, cols, rgList)
+}
+
+func (pfs *ParquetFileSource) GetReader(ctx context.Context) (FileReader,
error) {
+ pf, err := pfs.fs.Open(pfs.file.FilePath())
+ if err != nil {
+ return nil, err
+ }
+
+ rdr, err := file.NewParquetReader(pf,
+ file.WithReadProps(parquet.NewReaderProperties(pfs.mem)))
+ if err != nil {
+ return nil, err
+ }
+
+ // TODO: grab these from the context
+ arrProps := pqarrow.ArrowReadProperties{
+ Parallel: true,
+ BatchSize: 1 << 17,
+ }
+
+ if pfs.file.ContentType() == iceberg.EntryContentPosDeletes {
+ // for dictionary for filepath col
+ arrProps.SetReadDict(0, true)
+ }
+
+ fr, err := pqarrow.NewFileReader(rdr, arrProps, pfs.mem)
+ if err != nil {
+ return nil, err
+ }
+
+ return wrapPqArrowReader{fr}, nil
+}
+
+type manifestVisitor[T any] interface {
+ Manifest(*pqarrow.SchemaManifest, []T) T
+ Field(pqarrow.SchemaField, T) T
+ Struct(pqarrow.SchemaField, []T) T
+ List(pqarrow.SchemaField, T) T
+ Map(pqarrow.SchemaField, T, T) T
+ Primitive(pqarrow.SchemaField) T
+}
+
+func visitParquetManifest[T any](manifest *pqarrow.SchemaManifest, visitor
manifestVisitor[T]) (res T, err error) {
+ if manifest == nil {
+ err = fmt.Errorf("%w: cannot visit nil manifest",
iceberg.ErrInvalidArgument)
+ return
+ }
+
+ defer func() {
+ if r := recover(); r != nil {
+ err = fmt.Errorf("%s", r)
+ }
+ }()
+
+ results := make([]T, len(manifest.Fields))
+ for i, f := range manifest.Fields {
+ res := visitManifestField(f, visitor)
+ results[i] = visitor.Field(f, res)
+ }
+ return visitor.Manifest(manifest, results), nil
+}
+
+func visitParquetManifestStruct[T any](field pqarrow.SchemaField, visitor
manifestVisitor[T]) T {
+ results := make([]T, len(field.Children))
+
+ for i, f := range field.Children {
+ results[i] = visitManifestField(f, visitor)
+ }
+
+ return visitor.Struct(field, results)
+}
+
+func visitManifestList[T any](field pqarrow.SchemaField, visitor
manifestVisitor[T]) T {
+ elemField := field.Children[0]
+ res := visitManifestField(elemField, visitor)
+ return visitor.List(field, res)
+}
+
+func visitManifestMap[T any](field pqarrow.SchemaField, visitor
manifestVisitor[T]) T {
+ kvfield := field.Children[0]
+ keyField, valField := kvfield.Children[0], kvfield.Children[1]
+
+ return visitor.Map(field, visitManifestField(keyField, visitor),
visitManifestField(valField, visitor))
+}
+
+func visitManifestField[T any](field pqarrow.SchemaField, visitor
manifestVisitor[T]) T {
+ switch field.Field.Type.(type) {
+ case *arrow.StructType:
+ return visitParquetManifestStruct(field, visitor)
+ case *arrow.MapType:
+ return visitManifestMap(field, visitor)
+ case arrow.ListLikeType:
+ return visitManifestList(field, visitor)
+ default:
+ return visitor.Primitive(field)
+ }
+}
+
+func pruneParquetColumns(manifest *pqarrow.SchemaManifest, selected
map[int]struct{}, selectFullTypes bool) (*arrow.Schema, []int, error) {
+ visitor := &pruneParquetSchema{
+ selected: selected,
+ manifest: manifest,
+ fullTypes: selectFullTypes,
+ indices: []int{},
+ }
+
+ result, err := visitParquetManifest[arrow.Field](manifest, visitor)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return arrow.NewSchema(result.Type.(*arrow.StructType).Fields(),
&result.Metadata),
+ visitor.indices, nil
+}
+
+func getFieldID(f arrow.Field) *int {
+ if !f.HasMetadata() {
+ return nil
+ }
+
+ fieldIDStr, ok := f.Metadata.GetValue("PARQUET:field_id")
+ if !ok {
+ return nil
+ }
+
+ id, err := strconv.Atoi(fieldIDStr)
+ if err != nil {
+ return nil
+ }
+
+ return &id
+}
+
+type pruneParquetSchema struct {
+ selected map[int]struct{}
+ fullTypes bool
+ manifest *pqarrow.SchemaManifest
+
+ indices []int
+}
+
+func (p *pruneParquetSchema) fieldID(field arrow.Field) int {
+ if id := getFieldID(field); id != nil {
+ return *id
+ }
+
+ panic(fmt.Errorf("%w: cannot convert %s to Iceberg field, missing
field_id",
+ iceberg.ErrInvalidSchema, field))
+}
+
+func (p *pruneParquetSchema) Manifest(manifest *pqarrow.SchemaManifest, fields
[]arrow.Field) arrow.Field {
+ finalFields := slices.DeleteFunc(fields, func(f arrow.Field) bool {
return f.Type == nil })
+ result := arrow.Field{
+ Type: arrow.StructOf(finalFields...),
+ }
+ if manifest.SchemaMeta != nil {
+ result.Metadata = *manifest.SchemaMeta
+ }
+
+ return result
+}
+
+func (p *pruneParquetSchema) Struct(field pqarrow.SchemaField, children
[]arrow.Field) arrow.Field {
+ selected, fields := []arrow.Field{}, field.Children
+ sameType := true
+
+ for i, t := range children {
+ field := fields[i]
+ if arrow.TypeEqual(field.Field.Type, t.Type) {
+ selected = append(selected, *field.Field)
+ } else if t.Type == nil {
+ sameType = false
+ // type has changed, create a new field with the
projected type
+ selected = append(selected, arrow.Field{
+ Name: field.Field.Name,
+ Type: field.Field.Type,
+ Nullable: field.Field.Nullable,
+ Metadata: field.Field.Metadata,
+ })
+ }
+ }
+
+ if len(selected) > 0 {
+ if len(selected) == len(fields) && sameType {
+ // nothing changed, return the original
+ return *field.Field
+ } else {
+ result := *field.Field
+ result.Type = arrow.StructOf(selected...)
+ return result
+ }
+ }
+
+ return arrow.Field{}
+}
+
+func (p *pruneParquetSchema) Field(field pqarrow.SchemaField, result
arrow.Field) arrow.Field {
+ _, ok := p.selected[p.fieldID(*field.Field)]
+ if !ok {
+ if result.Type != nil {
+ return result
+ }
+
+ return arrow.Field{}
+ }
+
+ if p.fullTypes {
+ return *field.Field
+ }
+
+ if _, ok := field.Field.Type.(*arrow.StructType); ok {
+ result := *field.Field
+ result.Type = p.projectSelectedStruct(result.Type)
+ return result
+ }
+
+ if !field.IsLeaf() {
+ panic(fmt.Errorf("cannot explicitly project list or map types"))
+ }
+
+ p.indices = append(p.indices, field.ColIndex)
+ return *field.Field
+}
+
+func (p *pruneParquetSchema) List(field pqarrow.SchemaField, elemResult
arrow.Field) arrow.Field {
+ _, ok := p.selected[p.fieldID(*field.Children[0].Field)]
+ if !ok {
+ if elemResult.Type != nil {
+ result := *field.Field
+ result.Type =
p.projectList(field.Field.Type.(arrow.ListLikeType), elemResult.Type)
+ return result
+ }
+
+ return arrow.Field{}
+ }
+
+ if p.fullTypes {
+ return *field.Field
+ }
+
+ _, ok = field.Children[0].Field.Type.(*arrow.StructType)
+ if field.Children[0].Field.Type != nil && ok {
+ result := *field.Field
+ projected := p.projectSelectedStruct(elemResult.Type)
+ result.Type =
p.projectList(field.Field.Type.(arrow.ListLikeType), projected)
+ return result
+ }
+
+ if !field.Children[0].IsLeaf() {
+ panic(fmt.Errorf("cannot explicitly project list or map types"))
+ }
+
+ p.indices = append(p.indices, field.ColIndex)
+ return *field.Field
+}
+
+func (p *pruneParquetSchema) Map(field pqarrow.SchemaField, keyResult,
valResult arrow.Field) arrow.Field {
+ _, ok := p.selected[p.fieldID(*field.Children[0].Children[1].Field)]
+ if !ok {
+ if valResult.Type != nil {
+ result := *field.Field
+ result.Type =
p.projectMap(field.Field.Type.(*arrow.MapType), valResult.Type)
+ return result
+ }
+
+ if _, ok =
p.selected[p.fieldID(*field.Children[0].Children[1].Field)]; ok {
+ return *field.Field
+ }
+
+ return arrow.Field{}
+ }
+
+ if p.fullTypes {
+ return *field.Field
+ }
+
+ _, ok = field.Children[0].Children[1].Field.Type.(*arrow.StructType)
+ if ok {
+ result := *field.Field
+ projected := p.projectSelectedStruct(valResult.Type)
+ result.Type = p.projectMap(field.Field.Type.(*arrow.MapType),
projected)
+ return result
+ }
+
+ if !field.Children[0].Children[1].IsLeaf() {
+ panic("cannot explicitly project list or map types")
+ }
+
+ return *field.Field
+}
+
+func (p *pruneParquetSchema) Primitive(field pqarrow.SchemaField) arrow.Field {
+ return arrow.Field{}
+}
+
+func (p *pruneParquetSchema) projectSelectedStruct(projected arrow.DataType)
*arrow.StructType {
+ if projected == nil {
+ return &arrow.StructType{}
+ }
+
+ if ty, ok := projected.(*arrow.StructType); ok {
+ return ty
+ }
+
+ panic("expected a struct")
+}
+
+func (p *pruneParquetSchema) projectList(listType arrow.ListLikeType,
elemResult arrow.DataType) arrow.ListLikeType {
+ if arrow.TypeEqual(listType.Elem(), elemResult) {
+ return listType
+ }
+
+ origField := listType.ElemField()
+ origField.Type = elemResult
+
+ switch listType.(type) {
+ case *arrow.ListType:
+ return arrow.ListOfField(origField)
+ case *arrow.LargeListType:
+ return arrow.LargeListOfField(origField)
+ case *arrow.ListViewType:
+ return arrow.ListViewOfField(origField)
+ }
+
+ n := listType.(*arrow.FixedSizeListType).Len()
+ return arrow.FixedSizeListOfField(n, origField)
+}
+
+func (p *pruneParquetSchema) projectMap(m *arrow.MapType, valResult
arrow.DataType) *arrow.MapType {
+ if arrow.TypeEqual(m.ItemType(), valResult) {
+ return m
+ }
+
+ return arrow.MapOf(m.KeyType(), valResult)
+}
diff --git a/table/internal/utils.go b/table/internal/utils.go
new file mode 100644
index 0000000..6efc1bc
--- /dev/null
+++ b/table/internal/utils.go
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import "container/heap"
+
+// Enumerated is a quick way to represent a sequenced value that can
+// be processed in parallel and then needs to be reordered.
+type Enumerated[T any] struct {
+ Value T
+ Index int
+ Last bool
+}
+
+// a simple priority queue
+type pqueue[T any] struct {
+ queue []*T
+ compare func(a, b *T) bool
+}
+
+func (pq *pqueue[T]) Len() int { return len(pq.queue) }
+func (pq *pqueue[T]) Less(i, j int) bool {
+ return pq.compare(pq.queue[i], pq.queue[j])
+}
+func (pq *pqueue[T]) Swap(i, j int) {
+ pq.queue[i], pq.queue[j] = pq.queue[j], pq.queue[i]
+}
+
+func (pq *pqueue[T]) Push(x any) {
+ pq.queue = append(pq.queue, x.(*T))
+}
+
+func (pq *pqueue[T]) Pop() any {
+ old := pq.queue
+ n := len(old)
+
+ item := old[n-1]
+ old[n-1] = nil
+ pq.queue = old[0 : n-1]
+ return item
+}
+
+// MakeSequencedChan creates a channel that outputs values in a given order
+// based on the comesAfter and isNext functions. The values are read in from
+// the provided source and then re-ordered before being sent to the output.
+func MakeSequencedChan[T any](bufferSize uint, source <-chan T, comesAfter,
isNext func(a, b *T) bool, initial T) <-chan T {
+ pq := pqueue[T]{queue: make([]*T, 0), compare: comesAfter}
+ heap.Init(&pq)
+ previous, out := &initial, make(chan T, bufferSize)
+ go func() {
+ defer close(out)
+ for val := range source {
+ heap.Push(&pq, &val)
+ for pq.Len() > 0 && isNext(previous, pq.queue[0]) {
+ previous = heap.Pop(&pq).(*T)
+ out <- *previous
+ }
+ }
+ }()
+ return out
+}
diff --git a/table/scanner.go b/table/scanner.go
index ea33372..f7aab11 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -21,14 +21,19 @@ import (
"cmp"
"context"
"fmt"
+ "iter"
"runtime"
"slices"
"sync"
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/io"
)
+const ScanNoLimit = -1
+
type keyDefaultMap[K comparable, V any] struct {
defaultFactory func(K) V
data map[K]V
@@ -52,15 +57,15 @@ func (k *keyDefaultMap[K, V]) Get(key K) V {
return v
}
-func newKeyDefaultMap[K comparable, V any](factory func(K) V) keyDefaultMap[K,
V] {
- return keyDefaultMap[K, V]{
+func newKeyDefaultMap[K comparable, V any](factory func(K) V)
*keyDefaultMap[K, V] {
+ return &keyDefaultMap[K, V]{
data: make(map[K]V),
defaultFactory: factory,
}
}
-func newKeyDefaultMapWrapErr[K comparable, V any](factory func(K) (V, error))
keyDefaultMap[K, V] {
- return keyDefaultMap[K, V]{
+func newKeyDefaultMapWrapErr[K comparable, V any](factory func(K) (V, error))
*keyDefaultMap[K, V] {
+ return &keyDefaultMap[K, V]{
data: make(map[K]V),
defaultFactory: func(k K) V {
v, err := factory(k)
@@ -124,51 +129,51 @@ type Scan struct {
caseSensitive bool
snapshotID *int64
options iceberg.Properties
+ limit int64
+
+ partitionFilters *keyDefaultMap[int, iceberg.BooleanExpression]
+}
- partitionFilters keyDefaultMap[int, iceberg.BooleanExpression]
+func (scan *Scan) UseRowLimit(n int64) *Scan {
+ out := *scan
+ out.limit = n
+ return &out
}
-func (s *Scan) UseRef(name string) (*Scan, error) {
- if s.snapshotID != nil {
+func (scan *Scan) UseRef(name string) (*Scan, error) {
+ if scan.snapshotID != nil {
return nil, fmt.Errorf("%w: cannot override ref, already set
snapshot id %d",
- iceberg.ErrInvalidArgument, *s.snapshotID)
+ iceberg.ErrInvalidArgument, *scan.snapshotID)
}
- if snap := s.metadata.SnapshotByName(name); snap != nil {
- out := &Scan{
- metadata: s.metadata,
- io: s.io,
- rowFilter: s.rowFilter,
- selectedFields: s.selectedFields,
- caseSensitive: s.caseSensitive,
- snapshotID: &snap.SnapshotID,
- options: s.options,
- }
+ if snap := scan.metadata.SnapshotByName(name); snap != nil {
+ out := *scan
+ out.snapshotID = &snap.SnapshotID
out.partitionFilters =
newKeyDefaultMapWrapErr(out.buildPartitionProjection)
- return out, nil
+ return &out, nil
}
return nil, fmt.Errorf("%w: cannot scan unknown ref=%s",
iceberg.ErrInvalidArgument, name)
}
-func (s *Scan) Snapshot() *Snapshot {
- if s.snapshotID != nil {
- return s.metadata.SnapshotByID(*s.snapshotID)
+func (scan *Scan) Snapshot() *Snapshot {
+ if scan.snapshotID != nil {
+ return scan.metadata.SnapshotByID(*scan.snapshotID)
}
- return s.metadata.CurrentSnapshot()
+ return scan.metadata.CurrentSnapshot()
}
-func (s *Scan) Projection() (*iceberg.Schema, error) {
- curSchema := s.metadata.CurrentSchema()
- if s.snapshotID != nil {
- snap := s.metadata.SnapshotByID(*s.snapshotID)
+func (scan *Scan) Projection() (*iceberg.Schema, error) {
+ curSchema := scan.metadata.CurrentSchema()
+ if scan.snapshotID != nil {
+ snap := scan.metadata.SnapshotByID(*scan.snapshotID)
if snap == nil {
- return nil, fmt.Errorf("%w: snapshot not found: %d",
ErrInvalidOperation, *s.snapshotID)
+ return nil, fmt.Errorf("%w: snapshot not found: %d",
ErrInvalidOperation, *scan.snapshotID)
}
if snap.SchemaID != nil {
- for _, schema := range s.metadata.Schemas() {
+ for _, schema := range scan.metadata.Schemas() {
if schema.ID == *snap.SchemaID {
curSchema = schema
break
@@ -177,33 +182,33 @@ func (s *Scan) Projection() (*iceberg.Schema, error) {
}
}
- if slices.Contains(s.selectedFields, "*") {
+ if slices.Contains(scan.selectedFields, "*") {
return curSchema, nil
}
- return curSchema.Select(s.caseSensitive, s.selectedFields...)
+ return curSchema.Select(scan.caseSensitive, scan.selectedFields...)
}
-func (s *Scan) buildPartitionProjection(specID int)
(iceberg.BooleanExpression, error) {
- project := newInclusiveProjection(s.metadata.CurrentSchema(),
- s.metadata.PartitionSpecs()[specID], true)
- return project(s.rowFilter)
+func (scan *Scan) buildPartitionProjection(specID int)
(iceberg.BooleanExpression, error) {
+ project := newInclusiveProjection(scan.metadata.CurrentSchema(),
+ scan.metadata.PartitionSpecs()[specID], true)
+ return project(scan.rowFilter)
}
-func (s *Scan) buildManifestEvaluator(specID int) (func(iceberg.ManifestFile)
(bool, error), error) {
- spec := s.metadata.PartitionSpecs()[specID]
- return newManifestEvaluator(spec, s.metadata.CurrentSchema(),
- s.partitionFilters.Get(specID), s.caseSensitive)
+func (scan *Scan) buildManifestEvaluator(specID int)
(func(iceberg.ManifestFile) (bool, error), error) {
+ spec := scan.metadata.PartitionSpecs()[specID]
+ return newManifestEvaluator(spec, scan.metadata.CurrentSchema(),
+ scan.partitionFilters.Get(specID), scan.caseSensitive)
}
-func (s *Scan) buildPartitionEvaluator(specID int) func(iceberg.DataFile)
(bool, error) {
- spec := s.metadata.PartitionSpecs()[specID]
- partType := spec.PartitionType(s.metadata.CurrentSchema())
+func (scan *Scan) buildPartitionEvaluator(specID int) func(iceberg.DataFile)
(bool, error) {
+ spec := scan.metadata.PartitionSpecs()[specID]
+ partType := spec.PartitionType(scan.metadata.CurrentSchema())
partSchema := iceberg.NewSchema(0, partType.FieldList...)
- partExpr := s.partitionFilters.Get(specID)
+ partExpr := scan.partitionFilters.Get(specID)
return func(d iceberg.DataFile) (bool, error) {
- fn, err := iceberg.ExpressionEvaluator(partSchema, partExpr,
s.caseSensitive)
+ fn, err := iceberg.ExpressionEvaluator(partSchema, partExpr,
scan.caseSensitive)
if err != nil {
return false, err
}
@@ -212,7 +217,7 @@ func (s *Scan) buildPartitionEvaluator(specID int)
func(iceberg.DataFile) (bool,
}
}
-func (s *Scan) checkSequenceNumber(minSeqNum int64, manifest
iceberg.ManifestFile) bool {
+func (scan *Scan) checkSequenceNumber(minSeqNum int64, manifest
iceberg.ManifestFile) bool {
return manifest.ManifestContent() == iceberg.ManifestContentData ||
(manifest.ManifestContent() == iceberg.ManifestContentDeletes &&
manifest.SequenceNum() >= minSeqNum)
@@ -254,8 +259,8 @@ func matchDeletesToData(entry iceberg.ManifestEntry,
positionalDeletes []iceberg
return out, nil
}
-func (s *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
- snap := s.Snapshot()
+func (scan *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
+ snap := scan.Snapshot()
if snap == nil {
return nil, nil
}
@@ -263,8 +268,8 @@ func (s *Scan) PlanFiles(ctx context.Context)
([]FileScanTask, error) {
// step 1: filter manifests using partition summaries
// the filter depends on the partition spec used to write the manifest
file
// so create a cache of filters for each spec id
- manifestEvaluators := newKeyDefaultMapWrapErr(s.buildManifestEvaluator)
- manifestList, err := snap.Manifests(s.io)
+ manifestEvaluators :=
newKeyDefaultMapWrapErr(scan.buildManifestEvaluator)
+ manifestList, err := snap.Manifests(scan.io)
if err != nil {
return nil, err
}
@@ -278,9 +283,9 @@ func (s *Scan) PlanFiles(ctx context.Context)
([]FileScanTask, error) {
// step 2: filter the data files in each manifest
// this filter depends on the partition spec used to write the manifest
file
- partitionEvaluators := newKeyDefaultMap(s.buildPartitionEvaluator)
+ partitionEvaluators := newKeyDefaultMap(scan.buildPartitionEvaluator)
metricsEval, err := newInclusiveMetricsEvaluator(
- s.metadata.CurrentSchema(), s.rowFilter, s.caseSensitive,
s.options["include_empty_files"] == "true")
+ scan.metadata.CurrentSchema(), scan.rowFilter,
scan.caseSensitive, scan.options["include_empty_files"] == "true")
if err != nil {
return nil, err
}
@@ -289,7 +294,7 @@ func (s *Scan) PlanFiles(ctx context.Context)
([]FileScanTask, error) {
dataEntries := make([]iceberg.ManifestEntry, 0)
positionalDeleteEntries := make([]iceberg.ManifestEntry, 0)
- nworkers := runtime.NumCPU()
+ nworkers := min(runtime.NumCPU(), len(manifestList))
var wg sync.WaitGroup
manifestChan := make(chan iceberg.ManifestFile, len(manifestList))
@@ -309,11 +314,11 @@ func (s *Scan) PlanFiles(ctx context.Context)
([]FileScanTask, error) {
return
}
- if !s.checkSequenceNumber(minSeqNum, m)
{
+ if !scan.checkSequenceNumber(minSeqNum,
m) {
continue
}
- entries, err := openManifest(s.io, m,
+ entries, err := openManifest(scan.io, m,
partitionEvaluators.Get(int(m.PartitionSpecID())), metricsEval)
if err != nil {
cancel(err)
@@ -393,3 +398,62 @@ type FileScanTask struct {
DeleteFiles []iceberg.DataFile
Start, Length int64
}
+
+// ToArrowRecords returns the arrow schema of the expected records and an
interator
+// that can be used with a range expression to read the records as they are
available.
+// If an error is encountered, during the planning and setup then this will
return the
+// error directly. If the error occurs while iterating the records, it will be
returned
+// by the iterator.
+//
+// The purpose for returning the schema up front is to handle the case where
there are no
+// rows returned. The resulting Arrow Schema of the projection will still be
known.
+func (scan *Scan) ToArrowRecords(ctx context.Context) (*arrow.Schema,
iter.Seq2[arrow.Record, error], error) {
+ tasks, err := scan.PlanFiles(ctx)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ var boundFilter iceberg.BooleanExpression
+ if scan.rowFilter != nil {
+ boundFilter, err =
iceberg.BindExpr(scan.metadata.CurrentSchema(), scan.rowFilter,
scan.caseSensitive)
+ if err != nil {
+ return nil, nil, err
+ }
+ }
+
+ schema, err := scan.Projection()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return (&arrowScan{
+ metadata: scan.metadata,
+ fs: scan.io,
+ projectedSchema: schema,
+ boundRowFilter: boundFilter,
+ caseSensitive: scan.caseSensitive,
+ rowLimit: scan.limit,
+ options: scan.options,
+ }).GetRecords(ctx, tasks)
+}
+
+// ToArrowTable calls ToArrowRecords and then gathers all of the records
together
+// and returns an arrow.Table make from those records.
+func (scan *Scan) ToArrowTable(ctx context.Context) (arrow.Table, error) {
+ schema, itr, err := scan.ToArrowRecords(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ records := make([]arrow.Record, 0)
+ for rec, err := range itr {
+ if err != nil {
+ return nil, err
+ }
+
+ defer rec.Release()
+ records = append(records, rec)
+ }
+
+ return array.NewTableFromRecords(schema, records), nil
+}
diff --git a/table/scanner_test.go b/table/scanner_test.go
index af4b8f6..dcb7ed1 100644
--- a/table/scanner_test.go
+++ b/table/scanner_test.go
@@ -21,23 +21,46 @@ package table_test
import (
"context"
+ "iter"
+ "math"
+ "slices"
+ "strconv"
+ "strings"
"testing"
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/compute"
+ "github.com/apache/arrow-go/v18/arrow/extensions"
+ "github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
"github.com/apache/iceberg-go/io"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
+ "github.com/apache/iceberg-go/table"
+ "github.com/stretchr/testify/suite"
)
-func TestScanner(t *testing.T) {
+type ScannerSuite struct {
+ suite.Suite
+
+ ctx context.Context
+ cat catalog.Catalog
+ props iceberg.Properties
+}
+
+func (s *ScannerSuite) SetupTest() {
+ s.ctx = context.Background()
+
cat, err := catalog.NewRestCatalog("rest", "http://localhost:8181")
- require.NoError(t, err)
+ s.Require().NoError(err)
- props := iceberg.Properties{
+ s.cat = cat
+ s.props = iceberg.Properties{
io.S3Region: "us-east-1",
io.S3AccessKeyID: "admin", io.S3SecretAccessKey: "password"}
+}
+func (s *ScannerSuite) TestScanner() {
tests := []struct {
table string
expr iceberg.BooleanExpression
@@ -54,68 +77,549 @@ func TestScanner(t *testing.T) {
{"test_partitioned_by_years",
iceberg.LessThan(iceberg.Reference("dt"), "2023-03-05"), 1},
{"test_partitioned_by_years",
iceberg.GreaterThanEqual(iceberg.Reference("dt"), "2023-03-05"), 1},
{"test_partitioned_by_months",
iceberg.GreaterThanEqual(iceberg.Reference("dt"), "2023-03-05"), 1},
- {"test_partitioned_by_days",
iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00"),
4},
+ {"test_partitioned_by_days",
iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00"),
8},
{"test_partitioned_by_hours",
iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00"),
8},
{"test_partitioned_by_truncate",
iceberg.GreaterThanEqual(iceberg.Reference("letter"), "e"), 8},
{"test_partitioned_by_bucket",
iceberg.GreaterThanEqual(iceberg.Reference("number"), int32(5)), 6},
- {"test_uuid_and_fixed_unpartitioned", iceberg.AlwaysTrue{}, 4},
{"test_uuid_and_fixed_unpartitioned",
iceberg.EqualTo(iceberg.Reference("uuid_col"),
"102cb62f-e6f8-4eb0-9973-d9b012ff0967"), 1},
}
for _, tt := range tests {
- t.Run(tt.table+" "+tt.expr.String(), func(t *testing.T) {
+ s.Run(tt.table+" "+tt.expr.String(), func() {
ident := catalog.ToRestIdentifier("default", tt.table)
- tbl, err := cat.LoadTable(context.Background(), ident,
props)
- require.NoError(t, err)
+ tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+ s.Require().NoError(err)
- scan := tbl.Scan(tt.expr, 0, true, "*")
- tasks, err := scan.PlanFiles(context.Background())
- require.NoError(t, err)
+ scan := tbl.Scan(table.WithRowFilter(tt.expr))
+ tasks, err := scan.PlanFiles(s.ctx)
+ s.Require().NoError(err)
- assert.Len(t, tasks, tt.expectedNumTasks)
+ s.Len(tasks, tt.expectedNumTasks)
})
}
}
-func TestScannerWithDeletes(t *testing.T) {
- cat, err := catalog.NewRestCatalog("rest", "http://localhost:8181")
- require.NoError(t, err)
-
- props := iceberg.Properties{
- io.S3Region: "us-east-1",
- io.S3AccessKeyID: "admin", io.S3SecretAccessKey: "password"}
-
+func (s *ScannerSuite) TestScannerWithDeletes() {
ident := catalog.ToRestIdentifier("default",
"test_positional_mor_deletes")
- tbl, err := cat.LoadTable(context.Background(), ident, props)
- require.NoError(t, err)
+ tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+ s.Require().NoError(err)
- scan := tbl.Scan(iceberg.AlwaysTrue{}, 0, true, "*")
- tasks, err := scan.PlanFiles(context.Background())
- require.NoError(t, err)
+ scan := tbl.Scan()
+ tasks, err := scan.PlanFiles(s.ctx)
+ s.Require().NoError(err)
- assert.Len(t, tasks, 1)
- assert.Len(t, tasks[0].DeleteFiles, 1)
+ s.Len(tasks, 1)
+ s.Len(tasks[0].DeleteFiles, 1)
tagScan, err := scan.UseRef("tag_12")
- require.NoError(t, err)
+ s.Require().NoError(err)
- tasks, err = tagScan.PlanFiles(context.Background())
- require.NoError(t, err)
+ tasks, err = tagScan.PlanFiles(s.ctx)
+ s.Require().NoError(err)
- assert.Len(t, tasks, 1)
- assert.Len(t, tasks[0].DeleteFiles, 0)
+ s.Len(tasks, 1)
+ s.Len(tasks[0].DeleteFiles, 0)
_, err = tagScan.UseRef("without_5")
- assert.ErrorIs(t, err, iceberg.ErrInvalidArgument)
+ s.ErrorIs(err, iceberg.ErrInvalidArgument)
tagScan, err = scan.UseRef("without_5")
- require.NoError(t, err)
+ s.Require().NoError(err)
+
+ tasks, err = tagScan.PlanFiles(s.ctx)
+ s.Require().NoError(err)
+
+ s.Len(tasks, 1)
+ s.Len(tasks[0].DeleteFiles, 1)
+}
+
+func (s *ScannerSuite) TestArrowNan() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(s.T(), 0)
+
+ for _, name := range []string{"test_null_nan",
"test_null_nan_rewritten"} {
+ s.Run(name, func() {
+ ident := catalog.ToRestIdentifier("default", name)
+ tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+ s.Require().NoError(err)
+
+ ctx := compute.WithAllocator(s.ctx, mem)
+ results, err :=
tbl.Scan(table.WithRowFilter(iceberg.IsNaN(iceberg.Reference("col_numeric"))),
+ table.WithSelectedFields("idx",
"col_numeric")).ToArrowTable(ctx)
+ s.Require().NoError(err)
+ defer results.Release()
+
+ s.EqualValues(2, results.NumCols())
+ s.EqualValues(1, results.NumRows())
+
+ s.Equal(int32(1),
results.Column(0).Data().Chunk(0).(*array.Int32).Value(0))
+
s.True(math.IsNaN(float64(results.Column(1).Data().Chunk(0).(*array.Float32).Value(0))))
+ })
+ }
+}
+
+func (s *ScannerSuite) TestArrowNotNanCount() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(s.T(), 0)
+
+ ident := catalog.ToRestIdentifier("default", "test_null_nan")
+ tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+ s.Require().NoError(err)
+
+ ctx := compute.WithAllocator(s.ctx, mem)
+ results, err :=
tbl.Scan(table.WithRowFilter(iceberg.NotNaN(iceberg.Reference("col_numeric"))),
+ table.WithSelectedFields("idx")).ToArrowTable(ctx)
+ s.Require().NoError(err)
+ defer results.Release()
+
+ s.EqualValues(1, results.NumCols())
+ s.EqualValues(2, results.NumRows())
+}
+
+func (s *ScannerSuite) TestScanWithLimit() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(s.T(), 0)
+
+ ident := catalog.ToRestIdentifier("default", "test_limit")
+ tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+ s.Require().NoError(err)
+
+ tests := []struct {
+ limit int64
+ expectedRows int64
+ }{
+ {1, 1},
+ {0, 0},
+ {999, 10},
+ }
+
+ for _, tt := range tests {
+ s.Run(strconv.Itoa(int(tt.limit)), func() {
+ scopedMem := memory.NewCheckedAllocatorScope(mem)
+ defer scopedMem.CheckSize(s.T())
+
+ ctx := compute.WithAllocator(s.ctx, mem)
+ result, err := tbl.Scan(table.WithSelectedFields("idx"),
+ table.WithLimit(tt.limit)).ToArrowTable(ctx)
+ s.Require().NoError(err)
+ defer result.Release()
+
+ s.EqualValues(tt.expectedRows, result.NumRows())
+ })
+ }
+}
+
+func (s *ScannerSuite) TestScannerRecordsDeletes() {
+ // number, letter
+ // (1, 'a'),
+ // (2, 'b'),
+ // (3, 'c'),
+ // (4, 'd'),
+ // (5, 'e'),
+ // (6, 'f'),
+ // (7, 'g'),
+ // (8, 'h'),
+ // (9, 'i'), <- deleted
+ // (10, 'j'),
+ // (11, 'k'),
+ // (12, 'l')
+ ident := catalog.ToRestIdentifier("default",
"test_positional_mor_deletes")
+
+ tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+ s.Require().NoError(err)
+
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(s.T(), 0)
+
+ expectedSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "number", Type: arrow.PrimitiveTypes.Int32, Nullable:
true},
+ }, nil)
+
+ ref := iceberg.Reference("letter")
+
+ tests := []struct {
+ name string
+ filter iceberg.BooleanExpression
+ rowLimit int64
+ expected string
+ }{
+ {"all", iceberg.AlwaysTrue{}, table.ScanNoLimit,
+ `[1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]`},
+ {"filter", iceberg.NewAnd(iceberg.GreaterThanEqual(ref, "e"),
+ iceberg.LessThan(ref, "k")), table.ScanNoLimit, `[5, 6,
7, 8, 10]`},
+ {"filter and limit",
iceberg.NewAnd(iceberg.GreaterThanEqual(ref, "e"),
+ iceberg.LessThan(ref, "k")), 1, `[5]`},
+ {"limit", nil, 3, `[1, 2, 3]`},
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.name, func() {
+ scopedMem := memory.NewCheckedAllocatorScope(mem)
+ defer scopedMem.CheckSize(s.T())
+
+ ctx := compute.WithAllocator(s.ctx, mem)
+
+ scan := tbl.Scan(table.WithRowFilter(tt.filter),
+ table.WithSelectedFields("number"))
+ tasks, err := scan.PlanFiles(ctx)
+ s.Require().NoError(err)
+
+ s.Len(tasks, 1)
+ s.Len(tasks[0].DeleteFiles, 1)
+
+ _, itr, err :=
scan.UseRowLimit(tt.rowLimit).ToArrowRecords(ctx)
+ s.Require().NoError(err)
+
+ next, stop := iter.Pull2(itr)
+ defer stop()
+
+ rec, err, valid := next()
+ s.Require().True(valid)
+ s.Require().NoError(err)
+ defer rec.Release()
+
+ s.True(expectedSchema.Equal(rec.Schema()), "expected:
%s\ngot: %s\n",
+ expectedSchema, rec.Schema())
+
+ arr, _, err := array.FromJSON(mem,
arrow.PrimitiveTypes.Int32,
+ strings.NewReader(tt.expected))
+ s.Require().NoError(err)
+ defer arr.Release()
+
+ expectedResult := array.NewRecord(expectedSchema,
[]arrow.Array{arr}, int64(arr.Len()))
+ defer expectedResult.Release()
+
+ s.True(array.RecordEqual(expectedResult, rec),
"expected: %s\ngot: %s\n", expectedResult, rec)
+
+ _, err, valid = next()
+ s.Require().NoError(err)
+ s.Require().False(valid)
+ })
+ }
+}
+
+func (s *ScannerSuite) TestScannerRecordsDoubleDeletes() {
+ // number, letter
+ // (1, 'a'),
+ // (2, 'b'),
+ // (3, 'c'),
+ // (4, 'd'),
+ // (5, 'e'),
+ // (6, 'f'), <- second delete
+ // (7, 'g'),
+ // (8, 'h'),
+ // (9, 'i'), <- first delete
+ // (10, 'j'),
+ // (11, 'k'),
+ // (12, 'l')
+ ident := catalog.ToRestIdentifier("default",
"test_positional_mor_double_deletes")
+
+ tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+ s.Require().NoError(err)
+
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(s.T(), 0)
+
+ expectedSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "number", Type: arrow.PrimitiveTypes.Int32, Nullable:
true},
+ }, nil)
+
+ ref := iceberg.Reference("letter")
+
+ tests := []struct {
+ name string
+ filter iceberg.BooleanExpression
+ rowLimit int64
+ expected string
+ }{
+ {"all", iceberg.AlwaysTrue{}, table.ScanNoLimit,
+ `[1, 2, 3, 4, 5, 7, 8, 10, 11, 12]`},
+ {"filter", iceberg.NewAnd(iceberg.GreaterThanEqual(ref, "e"),
+ iceberg.LessThan(ref, "k")), table.ScanNoLimit, `[5, 7,
8, 10]`},
+ {"filter and limit",
iceberg.NewAnd(iceberg.GreaterThanEqual(ref, "e"),
+ iceberg.LessThan(ref, "k")), 1, `[5]`},
+ {"limit", nil, 8, `[1, 2, 3, 4, 5, 7, 8, 10]`},
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.name, func() {
+ scopedMem := memory.NewCheckedAllocatorScope(mem)
+ defer scopedMem.CheckSize(s.T())
+
+ ctx := compute.WithAllocator(s.ctx, mem)
+
+ scan := tbl.Scan(table.WithRowFilter(tt.filter),
+ table.WithSelectedFields("number"))
+ tasks, err := scan.PlanFiles(ctx)
+ s.Require().NoError(err)
+
+ s.Len(tasks, 1)
+ s.Len(tasks[0].DeleteFiles, 2)
- tasks, err = tagScan.PlanFiles(context.Background())
- require.NoError(t, err)
+ _, itr, err :=
scan.UseRowLimit(tt.rowLimit).ToArrowRecords(ctx)
+ s.Require().NoError(err)
- assert.Len(t, tasks, 1)
- assert.Len(t, tasks[0].DeleteFiles, 1)
+ next, stop := iter.Pull2(itr)
+ defer stop()
+
+ rec, err, valid := next()
+ s.Require().True(valid)
+ s.Require().NoError(err)
+ defer rec.Release()
+
+ s.True(expectedSchema.Equal(rec.Schema()), "expected:
%s\ngot: %s\n",
+ expectedSchema, rec.Schema())
+
+ arr, _, err := array.FromJSON(mem,
arrow.PrimitiveTypes.Int32,
+ strings.NewReader(tt.expected))
+ s.Require().NoError(err)
+ defer arr.Release()
+
+ expectedResult := array.NewRecord(expectedSchema,
[]arrow.Array{arr}, int64(arr.Len()))
+ defer expectedResult.Release()
+
+ s.True(array.RecordEqual(expectedResult, rec),
"expected: %s\ngot: %s\n", expectedResult, rec)
+
+ _, err, valid = next()
+ s.Require().NoError(err)
+ s.Require().False(valid)
+ })
+ }
+}
+
+func getSortedValues(col *arrow.Column) []int32 {
+ result := make([]int32, 0, col.Len())
+ for _, c := range col.Data().Chunks() {
+ arr := c.(*array.Int32)
+ result = append(result, arr.Int32Values()...)
+ }
+ slices.Sort(result)
+ return result
+}
+
+func getStrValues(col *arrow.Column) []string {
+ result := make([]string, 0, col.Len())
+ for _, c := range col.Data().Chunks() {
+ for i := 0; i < c.Len(); i++ {
+ result = append(result, c.ValueStr(i))
+ }
+ }
+ return result
+}
+
+func (s *ScannerSuite) TestPartitionedTables() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(s.T(), 0)
+
+ expectedSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "number", Type: arrow.PrimitiveTypes.Int32, Nullable:
true},
+ }, nil)
+
+ tests := []struct {
+ table string
+ predicate iceberg.BooleanExpression
+ }{
+ {"test_partitioned_by_identity",
+ iceberg.GreaterThanEqual(iceberg.Reference("ts"),
"2023-03-05T00:00:00+00:00")},
+ {"test_partitioned_by_years",
iceberg.GreaterThanEqual(iceberg.Reference("dt"), "2023-03-05")},
+ {"test_partitioned_by_months",
iceberg.GreaterThanEqual(iceberg.Reference("dt"), "2023-03-05")},
+ {"test_partitioned_by_days",
iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00")},
+ {"test_partitioned_by_hours",
iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00")},
+ {"test_partitioned_by_truncate",
iceberg.GreaterThanEqual(iceberg.Reference("letter"), "e")},
+ {"test_partitioned_by_bucket",
iceberg.GreaterThanEqual(iceberg.Reference("number"), int32(5))},
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.table+" "+tt.predicate.String(), func() {
+ scopedMem := memory.NewCheckedAllocatorScope(mem)
+ defer scopedMem.CheckSize(s.T())
+ ctx := compute.WithAllocator(s.ctx, mem)
+
+ ident := catalog.ToRestIdentifier("default", tt.table)
+
+ tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+ s.Require().NoError(err)
+
+ scan := tbl.Scan(table.WithRowFilter(tt.predicate),
+ table.WithSelectedFields("number"))
+ resultTable, err := scan.ToArrowTable(ctx)
+ s.Require().NoError(err)
+ defer resultTable.Release()
+
+ s.True(expectedSchema.Equal(resultTable.Schema()),
"expected: %s\ngot: %s\n",
+ expectedSchema, resultTable.Schema())
+
+ s.Equal([]int32{5, 6, 7, 8, 9, 10, 11, 12},
+ getSortedValues(resultTable.Column(0)))
+ })
+ }
+}
+
+func (s *ScannerSuite) TestUnpartitionedUUIDTable() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(s.T(), 0)
+
+ expectedSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "uuid_col", Type: extensions.NewUUIDType(), Nullable:
true},
+ }, nil)
+
+ ident := catalog.ToRestIdentifier("default",
"test_uuid_and_fixed_unpartitioned")
+
+ tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+ s.Require().NoError(err)
+
+ ctx := compute.WithAllocator(s.ctx, mem)
+ results, err := tbl.Scan(table.WithRowFilter(
+ iceberg.EqualTo(iceberg.Reference("uuid_col"),
+ "102cb62f-e6f8-4eb0-9973-d9b012ff0967")),
+ table.WithSelectedFields("uuid_col")).ToArrowTable(ctx)
+ s.Require().NoError(err)
+ defer results.Release()
+
+ s.True(expectedSchema.Equal(results.Schema()), "expected: %s\ngot:
%s\n",
+ expectedSchema, results.Schema())
+
+ s.EqualValues(1, results.NumRows())
+ resultCol := results.Column(0).Data().Chunk(0).(*extensions.UUIDArray)
+ s.Equal("102cb62f-e6f8-4eb0-9973-d9b012ff0967", resultCol.ValueStr(0))
+
+ neqResults, err := tbl.Scan(table.WithRowFilter(
+ iceberg.NewAnd(
+ iceberg.NotEqualTo(iceberg.Reference("uuid_col"),
+ "102cb62f-e6f8-4eb0-9973-d9b012ff0967"),
+ iceberg.NotEqualTo(iceberg.Reference("uuid_col"),
+ "639cccce-c9d2-494a-a78c-278ab234f024"))),
+ table.WithSelectedFields("uuid_col")).ToArrowTable(ctx)
+ s.Require().NoError(err)
+ defer neqResults.Release()
+
+ s.EqualValues(3, neqResults.NumRows())
+ s.Equal([]string{"ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226",
+ "c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b",
+ "923dae77-83d6-47cd-b4b0-d383e64ee57e"},
getStrValues(neqResults.Column(0)))
+}
+
+func (s *ScannerSuite) TestUnpartitionedFixedTable() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(s.T(), 0)
+
+ ident := catalog.ToRestIdentifier("default",
"test_uuid_and_fixed_unpartitioned")
+
+ tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+ s.Require().NoError(err)
+
+ ctx := compute.WithAllocator(s.ctx, mem)
+ results, err := tbl.Scan(table.WithRowFilter(
+ iceberg.EqualTo(iceberg.Reference("fixed_col"),
+ "1234567890123456789012345")),
+ table.WithCaseSensitive(false),
+ table.WithSelectedFields("fixed_col")).ToArrowTable(ctx)
+ s.Require().NoError(err)
+ defer results.Release()
+
+ s.EqualValues(1, results.NumRows())
+ resultCol := results.Column(0).Data().Chunk(0).(*array.FixedSizeBinary)
+ s.Equal([]byte("1234567890123456789012345"), resultCol.Value(0))
+
+ results, err = tbl.Scan(table.WithRowFilter(
+ iceberg.NewAnd(
+ iceberg.NotEqualTo(iceberg.Reference("fixed_col"),
"1234567890123456789012345"),
+ iceberg.NotEqualTo(iceberg.Reference("uuid_col"),
"c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b"))),
+ table.WithCaseSensitive(false),
table.WithSelectedFields("fixed_col")).ToArrowTable(ctx)
+ s.Require().NoError(err)
+ defer results.Release()
+
+ s.EqualValues(3, results.NumRows())
+ resultCol = results.Column(0).Data().Chunk(0).(*array.FixedSizeBinary)
+ s.Equal([]byte("1231231231231231231231231"), resultCol.Value(0))
+ resultCol = results.Column(0).Data().Chunk(1).(*array.FixedSizeBinary)
+ s.Equal([]byte("12345678901234567ass12345"), resultCol.Value(0))
+ resultCol = results.Column(0).Data().Chunk(2).(*array.FixedSizeBinary)
+ s.Equal([]byte("qweeqwwqq1231231231231111"), resultCol.Value(0))
+}
+
+func (s *ScannerSuite) TestScanTag() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(s.T(), 0)
+
+ ident := catalog.ToRestIdentifier("default",
"test_positional_mor_deletes")
+
+ tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+ s.Require().NoError(err)
+
+ ctx := compute.WithAllocator(s.ctx, mem)
+ scan, err := tbl.Scan().UseRef("tag_12")
+ s.Require().NoError(err)
+
+ results, err := scan.ToArrowTable(ctx)
+ defer results.Release()
+
+ s.EqualValues(3, results.NumCols())
+ s.Equal([]int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12},
+ results.Column(1).Data().Chunk(0).(*array.Int32).Int32Values())
+}
+
+func (s *ScannerSuite) TestScanBranch() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(s.T(), 0)
+
+ ident := catalog.ToRestIdentifier("default",
"test_positional_mor_deletes")
+
+ tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+ s.Require().NoError(err)
+
+ ctx := compute.WithAllocator(s.ctx, mem)
+ scan, err := tbl.Scan().UseRef("without_5")
+ s.Require().NoError(err)
+
+ results, err := scan.ToArrowTable(ctx)
+ defer results.Release()
+
+ s.EqualValues(3, results.NumCols())
+ s.Equal([]int32{1, 2, 3, 4, 6, 7, 8, 9, 10, 11, 12},
+ results.Column(1).Data().Chunk(0).(*array.Int32).Int32Values())
+}
+
+func (s *ScannerSuite) TestFilterOnNewColumn() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(s.T(), 0)
+
+ ident := catalog.ToRestIdentifier("default", "test_table_add_column")
+
+ tbl, err := s.cat.LoadTable(s.ctx, ident, s.props)
+ s.Require().NoError(err)
+
+ ctx := compute.WithAllocator(s.ctx, mem)
+ results, err := tbl.Scan(table.WithRowFilter(
+ iceberg.EqualTo(iceberg.Reference("b"), "2"))).ToArrowTable(ctx)
+ s.Require().NoError(err)
+ defer results.Release()
+
+ s.EqualValues(2, results.NumCols())
+ s.EqualValues(1, results.NumRows())
+ s.Equal("2", results.Column(1).Data().Chunk(0).(*array.String).Value(0))
+
+ results, err = tbl.Scan(table.WithRowFilter(
+ iceberg.NotNull(iceberg.Reference("b")))).ToArrowTable(ctx)
+ s.Require().NoError(err)
+ defer results.Release()
+
+ s.EqualValues(2, results.NumCols())
+ s.EqualValues(1, results.NumRows())
+ s.Equal("2", results.Column(1).Data().Chunk(0).(*array.String).Value(0))
+
+ results, err = tbl.Scan(table.WithRowFilter(
+ iceberg.IsNull(iceberg.Reference("b")))).ToArrowTable(ctx)
+ s.Require().NoError(err)
+ defer results.Release()
+
+ s.EqualValues(2, results.NumCols())
+ s.EqualValues(1, results.NumRows())
+ s.False(results.Column(1).Data().Chunk(0).(*array.String).IsValid(0))
+}
+
+func TestScanner(t *testing.T) {
+ suite.Run(t, new(ScannerSuite))
}
diff --git a/table/substrait/functions_set.yaml
b/table/substrait/functions_set.yaml
new file mode 100644
index 0000000..05ec383
--- /dev/null
+++ b/table/substrait/functions_set.yaml
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+%YAML 1.2
+---
+scalar_functions:
+ -
+ name: "is_in"
+ description: >
+ Checks membership of a value in a list of values
+
+ Returns true or false if `needle` is found in `haystack`.
+ impls:
+ - args:
+ - name: needle
+ value: any1
+ - name: haystack
+ value: list<any1>
+ options:
+ nan_equality:
+ values: [ NAN_IS_NAN, NAN_IS_NOT_NAN ]
+ return: "boolean"
diff --git a/table/substrait/substrait.go b/table/substrait/substrait.go
new file mode 100644
index 0000000..b6078a6
--- /dev/null
+++ b/table/substrait/substrait.go
@@ -0,0 +1,381 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package substrait
+
+import (
+ _ "embed"
+ "fmt"
+ "strings"
+
+ "github.com/apache/arrow-go/v18/arrow/compute/exprs"
+ "github.com/apache/iceberg-go"
+ "github.com/substrait-io/substrait-go/expr"
+ "github.com/substrait-io/substrait-go/extensions"
+ "github.com/substrait-io/substrait-go/types"
+)
+
+//go:embed functions_set.yaml
+var funcsetYAML string
+
+var (
+ collection = extensions.DefaultCollection
+ funcSetURI =
"https://github.com/apache/iceberg-go/blob/main/table/substrait/functions_set.yaml"
+)
+
+func init() {
+ if !collection.URILoaded(funcSetURI) {
+ if err := collection.Load(funcSetURI,
strings.NewReader(funcsetYAML)); err != nil {
+ panic(err)
+ }
+ }
+}
+
+func NewExtensionSet() exprs.ExtensionIDSet {
+ return
exprs.NewExtensionSetDefault(expr.NewEmptyExtensionRegistry(&collection))
+}
+
+// ConvertExpr binds the provided expression to the given schema and converts
it to a
+// substrait expression so that it can be utilized for computation.
+func ConvertExpr(schema *iceberg.Schema, e iceberg.BooleanExpression,
caseSensitive bool) (*expr.ExtensionRegistry, expr.Expression, error) {
+ base, err := ConvertSchema(schema)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ reg := expr.NewEmptyExtensionRegistry(&extensions.DefaultCollection)
+
+ bldr := expr.ExprBuilder{Reg: reg, BaseSchema: &base.Struct}
+ b, err := iceberg.VisitExpr(e, &toSubstraitExpr{bldr: bldr, schema:
schema,
+ caseSensitive: caseSensitive})
+ if err != nil {
+ return nil, nil, err
+ }
+
+ out, err := b.BuildExpr()
+ return ®, out, err
+}
+
+// ConvertSchema converts an Iceberg schema to a substrait NamedStruct using
+// the appropriate types and column names.
+func ConvertSchema(schema *iceberg.Schema) (res types.NamedStruct, err error) {
+ var typ types.Type
+
+ typ, err = iceberg.Visit(schema, convertToSubstrait{})
+ if err != nil {
+ return
+ }
+
+ val := typ.(*types.StructType)
+ res.Struct = *val
+
+ res.Names = make([]string, schema.NumFields())
+ for i, f := range schema.Fields() {
+ res.Names[i] = f.Name
+ }
+
+ return
+}
+
+type convertToSubstrait struct{}
+
+func (convertToSubstrait) Schema(_ *iceberg.Schema, result types.Type)
types.Type {
+ return result.WithNullability(types.NullabilityNullable)
+}
+
+func (convertToSubstrait) Struct(_ iceberg.StructType, results []types.Type)
types.Type {
+ return &types.StructType{
+ Nullability: types.NullabilityUnspecified,
+ Types: results,
+ }
+}
+
+func getNullability(required bool) types.Nullability {
+ if required {
+ return types.NullabilityRequired
+ }
+ return types.NullabilityNullable
+}
+
+func (convertToSubstrait) Field(field iceberg.NestedField, result types.Type)
types.Type {
+ return result.WithNullability(getNullability(field.Required))
+}
+
+func (c convertToSubstrait) List(list iceberg.ListType, elemResult types.Type)
types.Type {
+ return &types.ListType{
+ Nullability: types.NullabilityUnspecified,
+ Type: c.Field(list.ElementField(), elemResult),
+ }
+}
+
+func (c convertToSubstrait) Map(m iceberg.MapType, keyResult, valResult
types.Type) types.Type {
+ return &types.MapType{
+ Nullability: types.NullabilityUnspecified,
+ Key: c.Field(m.KeyField(), keyResult),
+ Value: c.Field(m.ValueField(), valResult),
+ }
+}
+
+func (convertToSubstrait) Primitive(iceberg.PrimitiveType) types.Type {
panic("should not be called") }
+
+func (convertToSubstrait) VisitFixed(f iceberg.FixedType) types.Type {
+ return &types.FixedBinaryType{Length: int32(f.Len())}
+}
+
+func (convertToSubstrait) VisitDecimal(d iceberg.DecimalType) types.Type {
+ return &types.DecimalType{Precision: int32(d.Precision()), Scale:
int32(d.Scale())}
+}
+
+func (convertToSubstrait) VisitBoolean() types.Type { return
&types.BooleanType{} }
+func (convertToSubstrait) VisitInt32() types.Type { return
&types.Int32Type{} }
+func (convertToSubstrait) VisitInt64() types.Type { return
&types.Int64Type{} }
+func (convertToSubstrait) VisitFloat32() types.Type { return
&types.Float32Type{} }
+func (convertToSubstrait) VisitFloat64() types.Type { return
&types.Float64Type{} }
+func (convertToSubstrait) VisitDate() types.Type { return
&types.DateType{} }
+func (convertToSubstrait) VisitTime() types.Type { return
&types.TimeType{} }
+func (convertToSubstrait) VisitTimestamp() types.Type { return
&types.TimestampType{} }
+func (convertToSubstrait) VisitTimestampTz() types.Type { return
&types.TimestampTzType{} }
+func (convertToSubstrait) VisitString() types.Type { return
&types.StringType{} }
+func (convertToSubstrait) VisitBinary() types.Type { return
&types.BinaryType{} }
+func (convertToSubstrait) VisitUUID() types.Type { return
&types.UUIDType{} }
+
+var (
+ _ iceberg.SchemaVisitorPerPrimitiveType[types.Type] =
(*convertToSubstrait)(nil)
+)
+
+var (
+ boolURI = extensions.SubstraitDefaultURIPrefix +
"functions_boolean.yaml"
+ compareURI = extensions.SubstraitDefaultURIPrefix +
"functions_comparison.yaml"
+ stringURI = extensions.SubstraitDefaultURIPrefix +
"functions_string.yaml"
+
+ notID = extensions.ID{URI: boolURI, Name: "not"}
+ andID = extensions.ID{URI: boolURI, Name: "and"}
+ orID = extensions.ID{URI: boolURI, Name: "or"}
+ isNaNID = extensions.ID{URI: compareURI, Name: "is_nan"}
+ isNullID = extensions.ID{URI: compareURI, Name: "is_null"}
+ isNotNullID = extensions.ID{URI: compareURI, Name: "is_not_null"}
+ equalID = extensions.ID{URI: compareURI, Name: "equal"}
+ notEqualID = extensions.ID{URI: compareURI, Name: "not_equal"}
+ greaterEqualID = extensions.ID{URI: compareURI, Name: "gte"}
+ greaterID = extensions.ID{URI: compareURI, Name: "gt"}
+ lessEqualID = extensions.ID{URI: compareURI, Name: "lte"}
+ lessID = extensions.ID{URI: compareURI, Name: "lt"}
+ startsWithID = extensions.ID{URI: stringURI, Name: "starts_with"}
+ isInID = extensions.ID{URI: funcSetURI, Name: "is_in"}
+)
+
+type toSubstraitExpr struct {
+ schema *iceberg.Schema
+ bldr expr.ExprBuilder
+ caseSensitive bool
+}
+
+func (t *toSubstraitExpr) VisitTrue() expr.Builder {
+ return t.bldr.Wrap(expr.NewLiteral(true, false))
+}
+
+func (t *toSubstraitExpr) VisitFalse() expr.Builder {
+ return t.bldr.Wrap(expr.NewLiteral(false, false))
+}
+
+func (t *toSubstraitExpr) VisitNot(child expr.Builder) expr.Builder {
+ return t.bldr.ScalarFunc(notID).Args(child.(expr.FuncArgBuilder))
+}
+
+func (t *toSubstraitExpr) VisitAnd(left, right expr.Builder) expr.Builder {
+ return t.bldr.ScalarFunc(andID).Args(left.(expr.FuncArgBuilder),
+ right.(expr.FuncArgBuilder))
+}
+
+func (t *toSubstraitExpr) VisitOr(left, right expr.Builder) expr.Builder {
+ return t.bldr.ScalarFunc(orID).Args(left.(expr.FuncArgBuilder),
+ right.(expr.FuncArgBuilder))
+}
+
+func (t *toSubstraitExpr) VisitUnbound(iceberg.UnboundPredicate) expr.Builder {
+ panic("can only convert bound expressions to substrait")
+}
+
+func (t *toSubstraitExpr) VisitBound(pred iceberg.BoundPredicate) expr.Builder
{
+ return iceberg.VisitBoundPredicate(pred, t)
+}
+
+type substraitPrimitiveLiteralTypes interface {
+ bool | ~int32 | ~int64 | float32 | float64 | string
+}
+
+func toPrimitiveSubstraitLiteral[T substraitPrimitiveLiteralTypes](v T)
expr.Literal {
+ return expr.NewPrimitiveLiteral(v, false)
+}
+
+func toByteSliceSubstraitLiteral[T []byte | types.UUID](v T) expr.Literal {
+ return expr.NewByteSliceLiteral(v, false)
+}
+
+func toDecimalLiteral(v iceberg.DecimalLiteral) expr.Literal {
+ byts, _ := v.MarshalBinary()
+ result, _ := expr.NewLiteral(&types.Decimal{
+ Scale: int32(v.Scale),
+ Value: byts,
+ Precision: int32(v.Type().(*iceberg.DecimalType).Precision()),
+ }, false)
+ return result
+}
+
+func toFixedLiteral(v iceberg.FixedLiteral) expr.Literal {
+ return expr.NewFixedBinaryLiteral(types.FixedBinary(v), false)
+}
+
+func toSubstraitLiteral(typ iceberg.Type, lit iceberg.Literal) expr.Literal {
+ switch lit := lit.(type) {
+ case iceberg.BoolLiteral:
+ return toPrimitiveSubstraitLiteral(bool(lit))
+ case iceberg.Int32Literal:
+ return toPrimitiveSubstraitLiteral(int32(lit))
+ case iceberg.Int64Literal:
+ return toPrimitiveSubstraitLiteral(int64(lit))
+ case iceberg.Float32Literal:
+ return toPrimitiveSubstraitLiteral(float32(lit))
+ case iceberg.Float64Literal:
+ return toPrimitiveSubstraitLiteral(float64(lit))
+ case iceberg.StringLiteral:
+ return toPrimitiveSubstraitLiteral(string(lit))
+ case iceberg.TimestampLiteral:
+ if typ.Equals(iceberg.PrimitiveTypes.TimestampTz) {
+ return
toPrimitiveSubstraitLiteral(types.TimestampTz(lit))
+ }
+ return toPrimitiveSubstraitLiteral(types.Timestamp(lit))
+ case iceberg.DateLiteral:
+ return toPrimitiveSubstraitLiteral(types.Date(lit))
+ case iceberg.TimeLiteral:
+ return toPrimitiveSubstraitLiteral(types.Time(lit))
+ case iceberg.BinaryLiteral:
+ return toByteSliceSubstraitLiteral([]byte(lit))
+ case iceberg.FixedLiteral:
+ return toFixedLiteral(lit)
+ case iceberg.UUIDLiteral:
+ return toByteSliceSubstraitLiteral(types.UUID(lit[:]))
+ case iceberg.DecimalLiteral:
+ return toDecimalLiteral(lit)
+ }
+ panic(fmt.Errorf("invalid literal type: %s", lit.Type()))
+}
+
+func toSubstraitLiteralSet(typ iceberg.Type, lits []iceberg.Literal)
expr.ListLiteralValue {
+ if len(lits) == 0 {
+ return nil
+ }
+
+ out := make([]expr.Literal, len(lits))
+ for i, l := range lits {
+ out[i] = toSubstraitLiteral(typ, l)
+ }
+ return out
+}
+
+func (t *toSubstraitExpr) getRef(ref iceberg.BoundReference) expr.Reference {
+ updatedRef, err := iceberg.Reference(ref.Field().Name).Bind(t.schema,
t.caseSensitive)
+ if err != nil {
+ panic(err)
+ }
+
+ path := updatedRef.Ref().PosPath()
+ out := expr.NewStructFieldRef(int32(path[0]))
+ if len(path) == 1 {
+ return out
+ }
+
+ cur := out
+ for _, p := range path[1:] {
+ next := expr.NewStructFieldRef(int32(p))
+ cur.Child, cur = next, next
+ }
+ return out
+}
+
+func (t *toSubstraitExpr) makeSetFunc(id extensions.ID, term
iceberg.BoundTerm, lits iceberg.Set[iceberg.Literal]) expr.Builder {
+ val := toSubstraitLiteralSet(term.Type(), lits.Members())
+ return t.bldr.ScalarFunc(id).Args(t.bldr.RootRef(t.getRef(term.Ref())),
+ t.bldr.Literal(expr.NewNestedLiteral(val, false)))
+}
+
+func (t *toSubstraitExpr) VisitIn(term iceberg.BoundTerm, lits
iceberg.Set[iceberg.Literal]) expr.Builder {
+ return t.makeSetFunc(isInID, term, lits)
+}
+
+func (t *toSubstraitExpr) VisitNotIn(term iceberg.BoundTerm, lits
iceberg.Set[iceberg.Literal]) expr.Builder {
+ return t.bldr.ScalarFunc(notID).Args(t.makeSetFunc(isInID, term,
lits).(expr.FuncArgBuilder))
+}
+
+func (t *toSubstraitExpr) makeRefFunc(id extensions.ID, term
iceberg.BoundTerm) expr.Builder {
+ return t.bldr.ScalarFunc(id).Args(t.bldr.RootRef(t.getRef(term.Ref())))
+}
+
+func (t *toSubstraitExpr) VisitIsNan(term iceberg.BoundTerm) expr.Builder {
+ return t.makeRefFunc(isNaNID, term)
+}
+
+func (t *toSubstraitExpr) VisitNotNan(term iceberg.BoundTerm) expr.Builder {
+ return t.bldr.ScalarFunc(notID).Args(
+ t.makeRefFunc(isNaNID, term).(expr.FuncArgBuilder))
+}
+
+func (t *toSubstraitExpr) VisitIsNull(term iceberg.BoundTerm) expr.Builder {
+ return t.makeRefFunc(isNullID, term)
+}
+
+func (t *toSubstraitExpr) VisitNotNull(term iceberg.BoundTerm) expr.Builder {
+ return t.makeRefFunc(isNotNullID, term)
+}
+
+func (t *toSubstraitExpr) makeLitFunc(id extensions.ID, term
iceberg.BoundTerm, lit iceberg.Literal) expr.Builder {
+ return t.bldr.ScalarFunc(id).Args(t.bldr.RootRef(t.getRef(term.Ref())),
+ t.bldr.Literal(toSubstraitLiteral(term.Type(), lit)))
+}
+
+func (t *toSubstraitExpr) VisitEqual(term iceberg.BoundTerm, lit
iceberg.Literal) expr.Builder {
+ return t.makeLitFunc(equalID, term, lit)
+}
+
+func (t *toSubstraitExpr) VisitNotEqual(term iceberg.BoundTerm, lit
iceberg.Literal) expr.Builder {
+ return t.makeLitFunc(notEqualID, term, lit)
+}
+
+func (t *toSubstraitExpr) VisitGreaterEqual(term iceberg.BoundTerm, lit
iceberg.Literal) expr.Builder {
+ return t.makeLitFunc(greaterEqualID, term, lit)
+}
+
+func (t *toSubstraitExpr) VisitGreater(term iceberg.BoundTerm, lit
iceberg.Literal) expr.Builder {
+ return t.makeLitFunc(greaterID, term, lit)
+}
+
+func (t *toSubstraitExpr) VisitLessEqual(term iceberg.BoundTerm, lit
iceberg.Literal) expr.Builder {
+ return t.makeLitFunc(lessEqualID, term, lit)
+}
+
+func (t *toSubstraitExpr) VisitLess(term iceberg.BoundTerm, lit
iceberg.Literal) expr.Builder {
+ return t.makeLitFunc(lessID, term, lit)
+}
+
+func (t *toSubstraitExpr) VisitStartsWith(term iceberg.BoundTerm, lit
iceberg.Literal) expr.Builder {
+ return t.makeLitFunc(startsWithID, term, lit)
+}
+
+func (t *toSubstraitExpr) VisitNotStartsWith(term iceberg.BoundTerm, lit
iceberg.Literal) expr.Builder {
+ return t.bldr.ScalarFunc(notID).Args(
+ t.makeLitFunc(startsWithID, term, lit).(expr.FuncArgBuilder))
+}
diff --git a/table/substrait/substrait_test.go
b/table/substrait/substrait_test.go
new file mode 100644
index 0000000..387a5a8
--- /dev/null
+++ b/table/substrait/substrait_test.go
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package substrait_test
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/table/substrait"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "github.com/substrait-io/substrait-go/types"
+)
+
+func TestRefTypes(t *testing.T) {
+ sc := iceberg.NewSchema(1,
+ iceberg.NestedField{ID: 1, Name: "a", Type:
iceberg.PrimitiveTypes.Bool},
+ iceberg.NestedField{ID: 2, Name: "b", Type:
iceberg.PrimitiveTypes.Int32},
+ iceberg.NestedField{ID: 3, Name: "c", Type:
iceberg.PrimitiveTypes.Int64},
+ iceberg.NestedField{ID: 4, Name: "d", Type:
iceberg.PrimitiveTypes.Float32},
+ iceberg.NestedField{ID: 5, Name: "e", Type:
iceberg.PrimitiveTypes.Float64},
+ iceberg.NestedField{ID: 6, Name: "f", Type:
iceberg.PrimitiveTypes.Date},
+ iceberg.NestedField{ID: 7, Name: "g", Type:
iceberg.PrimitiveTypes.Time},
+ iceberg.NestedField{ID: 8, Name: "h", Type:
iceberg.PrimitiveTypes.Timestamp},
+ iceberg.NestedField{ID: 9, Name: "i", Type:
iceberg.DecimalTypeOf(9, 2)},
+ iceberg.NestedField{ID: 10, Name: "j", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 11, Name: "k", Type:
iceberg.PrimitiveTypes.Binary},
+ iceberg.NestedField{ID: 12, Name: "l", Type:
iceberg.PrimitiveTypes.UUID},
+ iceberg.NestedField{ID: 13, Name: "m", Type:
iceberg.FixedTypeOf(5)})
+
+ tests := []struct {
+ name string
+ exp types.Type
+ }{
+ {"a", &types.BooleanType{}},
+ {"b", &types.Int32Type{}},
+ {"c", &types.Int64Type{}},
+ {"d", &types.Float32Type{}},
+ {"e", &types.Float64Type{}},
+ {"f", &types.DateType{}},
+ {"g", &types.TimeType{}},
+ {"h", &types.TimestampType{}},
+ {"i", &types.DecimalType{Scale: 2, Precision: 9}},
+ {"j", &types.StringType{}},
+ {"k", &types.BinaryType{}},
+ {"l", &types.UUIDType{}},
+ {"m", &types.FixedBinaryType{Length: 5}},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ref, err :=
iceberg.IsNull(iceberg.Reference(tt.name)).Bind(sc, true)
+ require.NoError(t, err)
+
+ idx := ref.(iceberg.BoundPredicate).Term().Ref().Pos()
+
+ _, converted, err := substrait.ConvertExpr(sc, ref,
true)
+ require.NoError(t, err)
+
+ assert.Equal(t, fmt.Sprintf("is_null(.field(%d) => %s)
=> boolean", idx,
+
tt.exp.WithNullability(types.NullabilityNullable)), converted.String())
+ })
+ }
+}
+
+var (
+ tableSchemaSimple = iceberg.NewSchemaWithIdentifiers(1,
+ []int{2},
+ iceberg.NestedField{ID: 1, Name: "foo", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 2, Name: "bar", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 3, Name: "baz", Type:
iceberg.PrimitiveTypes.Bool},
+ )
+
+ doubleSchema = iceberg.NewSchema(1,
+ iceberg.NestedField{ID: 1, Name: "foo", Type:
iceberg.PrimitiveTypes.Float64})
+)
+
+func TestExprs(t *testing.T) {
+ ref := iceberg.Reference("foo")
+
+ tests := []struct {
+ e iceberg.BooleanExpression
+ schema *iceberg.Schema
+ expected string
+ }{
+ {iceberg.IsNull(ref), tableSchemaSimple, "is_null(.field(0) =>
string?) => boolean"},
+ {iceberg.NotNull(ref), tableSchemaSimple,
"is_not_null(.field(0) => string?) => boolean"},
+ {iceberg.IsNaN(ref), doubleSchema, "is_nan(.field(0) => fp64?)
=> boolean?"},
+ {iceberg.NotNaN(ref), doubleSchema, "not(is_nan(.field(0) =>
fp64?) => boolean?) => boolean?"},
+ {iceberg.EqualTo(ref, "hello"), tableSchemaSimple,
"equal(.field(0) => string?, string(hello)) => boolean?"},
+ {iceberg.NotEqualTo(ref, "hello"), tableSchemaSimple,
"not_equal(.field(0) => string?, string(hello)) => boolean?"},
+ {iceberg.GreaterThanEqual(ref, "hello"), tableSchemaSimple,
"gte(.field(0) => string?, string(hello)) => boolean?"},
+ {iceberg.GreaterThan(ref, "hello"), tableSchemaSimple,
"gt(.field(0) => string?, string(hello)) => boolean?"},
+ {iceberg.LessThanEqual(ref, "hello"), tableSchemaSimple,
"lte(.field(0) => string?, string(hello)) => boolean?"},
+ {iceberg.LessThan(ref, "hello"), tableSchemaSimple,
"lt(.field(0) => string?, string(hello)) => boolean?"},
+ {iceberg.StartsWith(ref, "he"), tableSchemaSimple,
"starts_with(.field(0) => string?, string(he)) => boolean?"},
+ {iceberg.NotStartsWith(ref, "he"), tableSchemaSimple,
"not(starts_with(.field(0) => string?, string(he)) => boolean?) => boolean?"},
+ {iceberg.NewAnd(iceberg.EqualTo(ref, "hello"),
iceberg.IsNull(ref)), tableSchemaSimple,
+ "and(equal(.field(0) => string?, string(hello)) =>
boolean?, is_null(.field(0) => string?) => boolean) => boolean?"},
+ {iceberg.NewOr(iceberg.EqualTo(ref, "hello"),
iceberg.IsNull(ref)), tableSchemaSimple,
+ "or(equal(.field(0) => string?, string(hello)) =>
boolean?, is_null(.field(0) => string?) => boolean) => boolean?"},
+ {iceberg.NewNot(iceberg.EqualTo(ref, "hello")),
tableSchemaSimple,
+ "not(equal(.field(0) => string?, string(hello)) =>
boolean?) => boolean?"},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.e.String(), func(t *testing.T) {
+ bound, err := iceberg.BindExpr(tt.schema, tt.e, false)
+ require.NoError(t, err)
+
+ _, result, err := substrait.ConvertExpr(tt.schema,
bound, true)
+ require.NoError(t, err)
+ assert.Equal(t, tt.expected, result.String())
+ })
+ }
+}
diff --git a/table/table.go b/table/table.go
index 7064add..350782a 100644
--- a/table/table.go
+++ b/table/table.go
@@ -60,17 +60,78 @@ func (t Table) Schemas() map[int]*iceberg.Schema {
return m
}
-func (t Table) Scan(rowFilter iceberg.BooleanExpression, snapshotID int64,
caseSensitive bool, fields ...string) *Scan {
+type ScanOption func(*Scan)
+
+func noopOption(*Scan) {}
+
+func WithSelectedFields(fields ...string) ScanOption {
+ if len(fields) == 0 || slices.Contains(fields, "*") {
+ return noopOption
+ }
+
+ return func(scan *Scan) {
+ scan.selectedFields = fields
+ }
+}
+
+func WithRowFilter(e iceberg.BooleanExpression) ScanOption {
+ if e == nil || e.Equals(iceberg.AlwaysTrue{}) {
+ return noopOption
+ }
+
+ return func(scan *Scan) {
+ scan.rowFilter = e
+ }
+}
+
+func WithSnapshotID(n int64) ScanOption {
+ if n == 0 {
+ return noopOption
+ }
+
+ return func(scan *Scan) {
+ scan.snapshotID = &n
+ }
+}
+
+func WithCaseSensitive(b bool) ScanOption {
+ return func(scan *Scan) {
+ scan.caseSensitive = b
+ }
+}
+
+func WithLimit(n int64) ScanOption {
+ if n < 0 {
+ return noopOption
+ }
+
+ return func(scan *Scan) {
+ scan.limit = n
+ }
+}
+
+func WithOptions(opts iceberg.Properties) ScanOption {
+ if opts == nil {
+ return noopOption
+ }
+
+ return func(scan *Scan) {
+ scan.options = opts
+ }
+}
+
+func (t Table) Scan(opts ...ScanOption) *Scan {
s := &Scan{
metadata: t.metadata,
io: t.fs,
- rowFilter: rowFilter,
- selectedFields: fields,
- caseSensitive: caseSensitive,
+ rowFilter: iceberg.AlwaysTrue{},
+ selectedFields: []string{"*"},
+ caseSensitive: true,
+ limit: ScanNoLimit,
}
- if snapshotID != 0 {
- s.snapshotID = &snapshotID
+ for _, opt := range opts {
+ opt(s)
}
s.partitionFilters = newKeyDefaultMapWrapErr(s.buildPartitionProjection)
diff --git a/types.go b/types.go
index ebc8849..37723e8 100644
--- a/types.go
+++ b/types.go
@@ -36,6 +36,14 @@ var (
type Properties map[string]string
+// Get returns the value of the key if it exists, otherwise it returns the
default value.
+func (p Properties) Get(key, defVal string) string {
+ if v, ok := p[key]; ok {
+ return v
+ }
+ return defVal
+}
+
// Type is an interface representing any of the available iceberg types,
// such as primitives (int32/int64/etc.) or nested types (list/struct/map).
type Type interface {
diff --git a/visitors.go b/visitors.go
index bb0caab..f93c75a 100644
--- a/visitors.go
+++ b/visitors.go
@@ -444,3 +444,56 @@ func (expressionFieldIDs) VisitBound(pred BoundPredicate)
map[int]struct{} {
pred.Ref().Field().ID: {},
}
}
+
+// TranslateColumnNames converts the names of columns in an expression by
looking up
+// the field IDs in the file schema. If columns don't exist they are replaced
with
+// AlwaysFalse or AlwaysTrue depending on the operator.
+func TranslateColumnNames(expr BooleanExpression, fileSchema *Schema)
(BooleanExpression, error) {
+ return VisitExpr(expr, columnNameTranslator{fileSchema: fileSchema})
+}
+
+type columnNameTranslator struct {
+ fileSchema *Schema
+}
+
+func (columnNameTranslator) VisitTrue() BooleanExpression { return
AlwaysTrue{} }
+func (columnNameTranslator) VisitFalse() BooleanExpression { return
AlwaysFalse{} }
+func (columnNameTranslator) VisitNot(child BooleanExpression)
BooleanExpression {
+ return NewNot(child)
+}
+
+func (columnNameTranslator) VisitAnd(left, right BooleanExpression)
BooleanExpression {
+ return NewAnd(left, right)
+}
+
+func (columnNameTranslator) VisitOr(left, right BooleanExpression)
BooleanExpression {
+ return NewOr(left, right)
+}
+
+func (columnNameTranslator) VisitUnbound(pred UnboundPredicate)
BooleanExpression {
+ panic(fmt.Errorf("%w: expected bound predicate, got: %s",
ErrInvalidArgument, pred.Term()))
+}
+
+func (c columnNameTranslator) VisitBound(pred BoundPredicate)
BooleanExpression {
+ fileColName, found :=
c.fileSchema.FindColumnName(pred.Term().Ref().Field().ID)
+ if !found {
+ // in the case of schema evolution, the column might not be
present
+ // in the file schema when reading older data
+ if pred.Op() == OpIsNull {
+ return AlwaysTrue{}
+ }
+ return AlwaysFalse{}
+ }
+
+ ref := Reference(fileColName)
+ switch p := pred.(type) {
+ case BoundUnaryPredicate:
+ return p.AsUnbound(ref)
+ case BoundLiteralPredicate:
+ return p.AsUnbound(ref, p.Literal())
+ case BoundSetPredicate:
+ return p.AsUnbound(ref, p.Literals().Members())
+ default:
+ panic(fmt.Errorf("%w: unsupported predicate: %s",
ErrNotImplemented, pred))
+ }
+}