This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 6e1ba00 feat(table/scanner): Initial pass for planning a scan and
returning the files to use (#118)
6e1ba00 is described below
commit 6e1ba00940e03992f5546dd0ca110a8096866c21
Author: Matt Topol <[email protected]>
AuthorDate: Fri Aug 23 09:13:10 2024 -0400
feat(table/scanner): Initial pass for planning a scan and returning the
files to use (#118)
---
.github/workflows/go-ci.yml | 4 +-
.github/workflows/go-integration.yml | 71 ++
dev/Dockerfile | 25 +
dev/docker-compose.yml | 8 +-
dev/provision.py | 379 ++++++++++
exprs.go | 44 +-
go.mod | 1 +
go.sum | 2 +
io/s3.go | 8 +-
manifest.go | 2 -
table/evaluators.go | 635 +++++++++++++++++
table/evaluators_test.go | 1294 ++++++++++++++++++++++++++++++++++
table/scanner.go | 395 +++++++++++
table/scanner_test.go | 124 ++++
table/table.go | 17 +
transforms.go | 695 ++++++++++++++++++
16 files changed, 3694 insertions(+), 10 deletions(-)
diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml
index a6014fd..e82a3ed 100644
--- a/.github/workflows/go-ci.yml
+++ b/.github/workflows/go-ci.yml
@@ -20,7 +20,7 @@ name: Go
on:
push:
branches:
- - 'master'
+ - 'main'
tags:
- 'v**'
pull_request:
@@ -39,7 +39,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- go: [ '1.21' ]
+ go: [ '1.21', '1.22' ]
os: [ 'ubuntu-latest', 'windows-latest', 'macos-latest' ]
steps:
- uses: actions/checkout@v3
diff --git a/.github/workflows/go-integration.yml
b/.github/workflows/go-integration.yml
new file mode 100644
index 0000000..b736ef3
--- /dev/null
+++ b/.github/workflows/go-integration.yml
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "Go Integration"
+
+on:
+ push:
+ branches:
+ - 'main'
+ tags:
+ - 'v**'
+ pull_request:
+
+concurrency:
+ group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{
github.workflow }}
+ cancel-in-progress: ${{ github.event_name == 'pull_request' }}
+
+permissions:
+ contents: read
+
+jobs:
+ integration-test:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ with:
+ fetch-depth: 2
+ - name: Install Go
+ uses: actions/setup-go@v4
+ with:
+ go-version: 1.22
+ cache: true
+ cache-dependency-path: go.sum
+
+ - name: Start docker
+ run: |
+ docker compose -f dev/docker-compose.yml up -d
+ sleep 10
+ - name: Provision Tables
+ run: |
+ docker compose -f dev/docker-compose.yml exec -T spark-iceberg
ipython ./provision.py
+ sleep 10
+
+ - name: Get minio container IP
+ run: |
+ echo "AWS_S3_ENDPOINT=http://$(docker inspect -f
'{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' minio):9000" >>
$GITHUB_ENV
+
+ - name: Run integration tests
+ env:
+ AWS_S3_ENDPOINT: "${{ env.AWS_S3_ENDPOINT }}"
+ AWS_REGION: "us-east-1"
+ run: |
+ go test -tags integration -v -run="^TestScanner" ./table
+
+ - name: Show debug logs
+ if: ${{ failure() }}
+ run: docker compose -f dev/docker-compose.yml logs
diff --git a/dev/Dockerfile b/dev/Dockerfile
new file mode 100644
index 0000000..c8b9e65
--- /dev/null
+++ b/dev/Dockerfile
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM tabulario/spark-iceberg
+
+RUN pip3 install -q ipython
+RUN pip3 install pyiceberg[s3fs,hive]
+RUN pip3 install pyarrow
+
+COPY provision.py .
+
+ENTRYPOINT ["./entrypoint.sh"]
+CMD ["notebook"]
\ No newline at end of file
diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml
index 2978c42..63821cd 100644
--- a/dev/docker-compose.yml
+++ b/dev/docker-compose.yml
@@ -18,11 +18,12 @@ version: "3"
services:
spark-iceberg:
- image: tabulario/spark-iceberg
+ image: pyiceberg-spark
container_name: spark-iceberg
+ build: .
networks:
iceberg_net:
- depends_on:
+ depends_on:
- rest
- minio
volumes:
@@ -85,5 +86,6 @@ services:
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
+
networks:
- iceberg_net:
\ No newline at end of file
+ iceberg_net:
diff --git a/dev/provision.py b/dev/provision.py
new file mode 100644
index 0000000..77906c1
--- /dev/null
+++ b/dev/provision.py
@@ -0,0 +1,379 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import current_date, date_add, expr
+
+from pyiceberg.catalog import load_catalog
+from pyiceberg.schema import Schema
+from pyiceberg.types import FixedType, NestedField, UUIDType
+
+spark = SparkSession.builder.getOrCreate()
+
+catalogs = {
+ 'rest': load_catalog(
+ "rest",
+ **{
+ "type": "rest",
+ "uri": "http://rest:8181",
+ "s3.endpoint": "http://minio:9000",
+ "s3.access-key-id": "admin",
+ "s3.secret-access-key": "password",
+ },
+ ),
+}
+
+for catalog_name, catalog in catalogs.items():
+ spark.sql(
+ f"""
+ CREATE DATABASE IF NOT EXISTS default;
+ """
+ )
+
+ schema = Schema(
+ NestedField(field_id=1, name="uuid_col", field_type=UUIDType(),
required=False),
+ NestedField(field_id=2, name="fixed_col", field_type=FixedType(25),
required=False),
+ )
+
+
catalog.create_table(identifier=f"default.test_uuid_and_fixed_unpartitioned",
schema=schema)
+
+ spark.sql(
+ f"""
+ INSERT INTO default.test_uuid_and_fixed_unpartitioned VALUES
+ ('102cb62f-e6f8-4eb0-9973-d9b012ff0967',
CAST('1234567890123456789012345' AS BINARY)),
+ ('ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226',
CAST('1231231231231231231231231' AS BINARY)),
+ ('639cccce-c9d2-494a-a78c-278ab234f024',
CAST('12345678901234567ass12345' AS BINARY)),
+ ('c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b',
CAST('asdasasdads12312312312111' AS BINARY)),
+ ('923dae77-83d6-47cd-b4b0-d383e64ee57e',
CAST('qweeqwwqq1231231231231111' AS BINARY));
+ """
+ )
+
+ spark.sql(
+ f"""
+ CREATE OR REPLACE TABLE default.test_null_nan
+ USING iceberg
+ AS SELECT
+ 1 AS idx,
+ float('NaN') AS col_numeric
+ UNION ALL SELECT
+ 2 AS idx,
+ null AS col_numeric
+ UNION ALL SELECT
+ 3 AS idx,
+ 1 AS col_numeric;
+ """
+ )
+
+ spark.sql(
+ f"""
+ CREATE OR REPLACE TABLE default.test_null_nan_rewritten
+ USING iceberg
+ AS SELECT * FROM default.test_null_nan;
+ """
+ )
+
+ spark.sql(
+ f"""
+ CREATE OR REPLACE TABLE default.test_limit as
+ SELECT * LATERAL VIEW explode(ARRAY(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) AS
idx;
+ """
+ )
+
+ spark.sql(
+ f"""
+ CREATE OR REPLACE TABLE default.test_positional_mor_deletes (
+ dt date,
+ number integer,
+ letter string
+ )
+ USING iceberg
+ TBLPROPERTIES (
+ 'write.delete.mode'='merge-on-read',
+ 'write.update.mode'='merge-on-read',
+ 'write.merge.mode'='merge-on-read',
+ 'format-version'='2'
+ );
+ """
+ )
+
+ # Partitioning is not really needed, but there is a bug:
+ # https://github.com/apache/iceberg/pull/7685
+ spark.sql(f"ALTER TABLE default.test_positional_mor_deletes ADD PARTITION
FIELD years(dt) AS dt_years")
+
+ spark.sql(
+ f"""
+ INSERT INTO default.test_positional_mor_deletes
+ VALUES
+ (CAST('2023-03-01' AS date), 1, 'a'),
+ (CAST('2023-03-02' AS date), 2, 'b'),
+ (CAST('2023-03-03' AS date), 3, 'c'),
+ (CAST('2023-03-04' AS date), 4, 'd'),
+ (CAST('2023-03-05' AS date), 5, 'e'),
+ (CAST('2023-03-06' AS date), 6, 'f'),
+ (CAST('2023-03-07' AS date), 7, 'g'),
+ (CAST('2023-03-08' AS date), 8, 'h'),
+ (CAST('2023-03-09' AS date), 9, 'i'),
+ (CAST('2023-03-10' AS date), 10, 'j'),
+ (CAST('2023-03-11' AS date), 11, 'k'),
+ (CAST('2023-03-12' AS date), 12, 'l');
+ """
+ )
+
+ spark.sql(f"ALTER TABLE default.test_positional_mor_deletes CREATE TAG
tag_12")
+
+ spark.sql(f"ALTER TABLE default.test_positional_mor_deletes CREATE BRANCH
without_5")
+
+ spark.sql(f"DELETE FROM
default.test_positional_mor_deletes.branch_without_5 WHERE number = 5")
+
+ spark.sql(f"DELETE FROM default.test_positional_mor_deletes WHERE number =
9")
+
+ spark.sql(
+ f"""
+ CREATE OR REPLACE TABLE default.test_positional_mor_double_deletes (
+ dt date,
+ number integer,
+ letter string
+ )
+ USING iceberg
+ TBLPROPERTIES (
+ 'write.delete.mode'='merge-on-read',
+ 'write.update.mode'='merge-on-read',
+ 'write.merge.mode'='merge-on-read',
+ 'format-version'='2'
+ );
+ """
+ )
+
+ spark.sql(f"ALTER TABLE default.test_positional_mor_double_deletes ADD
PARTITION FIELD years(dt) AS dt_years")
+
+ spark.sql(
+ f"""
+ INSERT INTO default.test_positional_mor_double_deletes
+ VALUES
+ (CAST('2023-03-01' AS date), 1, 'a'),
+ (CAST('2023-03-02' AS date), 2, 'b'),
+ (CAST('2023-03-03' AS date), 3, 'c'),
+ (CAST('2023-03-04' AS date), 4, 'd'),
+ (CAST('2023-03-05' AS date), 5, 'e'),
+ (CAST('2023-03-06' AS date), 6, 'f'),
+ (CAST('2023-03-07' AS date), 7, 'g'),
+ (CAST('2023-03-08' AS date), 8, 'h'),
+ (CAST('2023-03-09' AS date), 9, 'i'),
+ (CAST('2023-03-10' AS date), 10, 'j'),
+ (CAST('2023-03-11' AS date), 11, 'k'),
+ (CAST('2023-03-12' AS date), 12, 'l');
+ """
+ )
+
+ spark.sql(f"DELETE FROM default.test_positional_mor_double_deletes WHERE
number = 9")
+
+ spark.sql(f"DELETE FROM default.test_positional_mor_double_deletes WHERE
letter == 'f'")
+
+ all_types_dataframe = (
+ spark.range(0, 5, 1, 5)
+ .withColumnRenamed("id", "longCol")
+ .withColumn("intCol", expr("CAST(longCol AS INT)"))
+ .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
+ .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
+ .withColumn("dateCol", date_add(current_date(), 1))
+ .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
+ .withColumn("stringCol", expr("CAST(dateCol AS STRING)"))
+ .withColumn("booleanCol", expr("longCol > 5"))
+ .withColumn("binaryCol", expr("CAST(longCol AS BINARY)"))
+ .withColumn("byteCol", expr("CAST(longCol AS BYTE)"))
+ .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))"))
+ .withColumn("shortCol", expr("CAST(longCol AS SHORT)"))
+ .withColumn("mapCol", expr("MAP(longCol, decimalCol)"))
+ .withColumn("arrayCol", expr("ARRAY(longCol)"))
+ .withColumn("structCol", expr("STRUCT(mapCol, arrayCol)"))
+ )
+
+
all_types_dataframe.writeTo(f"default.test_all_types").tableProperty("format-version",
"2").partitionedBy(
+ "intCol"
+ ).createOrReplace()
+
+ for table_name, partition in [
+ ("test_partitioned_by_identity", "ts"),
+ ("test_partitioned_by_years", "years(dt)"),
+ ("test_partitioned_by_months", "months(dt)"),
+ ("test_partitioned_by_days", "days(ts)"),
+ ("test_partitioned_by_hours", "hours(ts)"),
+ ("test_partitioned_by_truncate", "truncate(1, letter)"),
+ ("test_partitioned_by_bucket", "bucket(16, number)"),
+ ]:
+ spark.sql(
+ f"""
+ CREATE OR REPLACE TABLE default.{table_name} (
+ dt date,
+ ts timestamp,
+ number integer,
+ letter string
+ )
+ USING iceberg;
+ """
+ )
+
+ spark.sql(f"ALTER TABLE default.{table_name} ADD PARTITION FIELD
{partition}")
+
+ spark.sql(
+ f"""
+ INSERT INTO default.{table_name}
+ VALUES
+ (CAST('2022-03-01' AS date), CAST('2022-03-01 01:22:00' AS
timestamp), 1, 'a'),
+ (CAST('2022-03-02' AS date), CAST('2022-03-02 02:22:00' AS
timestamp), 2, 'b'),
+ (CAST('2022-03-03' AS date), CAST('2022-03-03 03:22:00' AS
timestamp), 3, 'c'),
+ (CAST('2022-03-04' AS date), CAST('2022-03-04 04:22:00' AS
timestamp), 4, 'd'),
+ (CAST('2023-03-05' AS date), CAST('2023-03-05 05:22:00' AS
timestamp), 5, 'e'),
+ (CAST('2023-03-06' AS date), CAST('2023-03-06 06:22:00' AS
timestamp), 6, 'f'),
+ (CAST('2023-03-07' AS date), CAST('2023-03-07 07:22:00' AS
timestamp), 7, 'g'),
+ (CAST('2023-03-08' AS date), CAST('2023-03-08 08:22:00' AS
timestamp), 8, 'h'),
+ (CAST('2023-03-09' AS date), CAST('2023-03-09 09:22:00' AS
timestamp), 9, 'i'),
+ (CAST('2023-03-10' AS date), CAST('2023-03-10 10:22:00' AS
timestamp), 10, 'j'),
+ (CAST('2023-03-11' AS date), CAST('2023-03-11 11:22:00' AS
timestamp), 11, 'k'),
+ (CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS
timestamp), 12, 'l');
+ """
+ )
+
+ # There is an issue with CREATE OR REPLACE
+ # https://github.com/apache/iceberg/issues/8756
+ spark.sql(f"DROP TABLE IF EXISTS default.test_table_version")
+
+ spark.sql(
+ f"""
+ CREATE TABLE default.test_table_version (
+ dt date,
+ number integer,
+ letter string
+ )
+ USING iceberg
+ TBLPROPERTIES (
+ 'format-version'='1'
+ );
+ """
+ )
+
+ spark.sql(
+ f"""
+ CREATE TABLE default.test_table_sanitized_character (
+ `letter/abc` string
+ )
+ USING iceberg
+ TBLPROPERTIES (
+ 'format-version'='1'
+ );
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO default.test_table_sanitized_character
+ VALUES
+ ('123')
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO default.test_table_sanitized_character
+ VALUES
+ ('123')
+ """
+ )
+
+ spark.sql(
+ f"""
+ CREATE TABLE default.test_table_add_column (
+ a string
+ )
+ USING iceberg
+ """
+ )
+
+ spark.sql(f"INSERT INTO default.test_table_add_column VALUES ('1')")
+
+ spark.sql(f"ALTER TABLE default.test_table_add_column ADD COLUMN b string")
+
+ spark.sql(f"INSERT INTO default.test_table_add_column VALUES ('2', '2')")
+
+ spark.sql(
+ f"""
+ CREATE TABLE default.test_table_empty_list_and_map (
+ col_list array<int>,
+ col_map map<int, int>,
+ col_list_with_struct array<struct<test:int>>
+ )
+ USING iceberg
+ TBLPROPERTIES (
+ 'format-version'='1'
+ );
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO default.test_table_empty_list_and_map
+ VALUES (null, null, null),
+ (array(), map(), array(struct(1)))
+ """
+ )
+
+ spark.sql(
+ f"""
+ CREATE OR REPLACE TABLE default.test_table_snapshot_operations (
+ number integer
+ )
+ USING iceberg
+ TBLPROPERTIES (
+ 'format-version'='2'
+ );
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO default.test_table_snapshot_operations
+ VALUES (1)
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO default.test_table_snapshot_operations
+ VALUES (2)
+ """
+ )
+
+ spark.sql(
+ f"""
+ DELETE FROM default.test_table_snapshot_operations
+ WHERE number = 2
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO default.test_table_snapshot_operations
+ VALUES (3)
+ """
+ )
+
+ spark.sql(
+ f"""
+ INSERT INTO default.test_table_snapshot_operations
+ VALUES (4)
+ """
+ )
\ No newline at end of file
diff --git a/exprs.go b/exprs.go
index bc451ca..4bdb8c1 100644
--- a/exprs.go
+++ b/exprs.go
@@ -980,8 +980,50 @@ func (bsp *boundSetPredicate[T]) String() string {
return fmt.Sprintf("Bound%s(term=%s, {%v})", bsp.op, bsp.term,
bsp.lits.Members())
}
func (bsp *boundSetPredicate[T]) AsUnbound(r Reference, lits []Literal)
UnboundPredicate {
- return &unboundSetPredicate{op: bsp.op, term: r, lits:
newLiteralSet(lits...)}
+ litSet := newLiteralSet(lits...)
+ if litSet.Len() == 1 {
+ switch bsp.op {
+ case OpIn:
+ return LiteralPredicate(OpEQ, r, lits[0])
+ case OpNotIn:
+ return LiteralPredicate(OpNEQ, r, lits[0])
+ }
+ }
+
+ return &unboundSetPredicate{op: bsp.op, term: r, lits: litSet}
}
+
func (bsp *boundSetPredicate[T]) Literals() Set[Literal] {
return bsp.lits
}
+
+type BoundTransform struct {
+ transform Transform
+ term BoundTerm
+}
+
+func (*BoundTransform) isTerm() {}
+func (b *BoundTransform) String() string {
+ return fmt.Sprintf("BoundTransform(transform=%s, term=%s)",
+ b.transform, b.term)
+}
+
+func (b *BoundTransform) Ref() BoundReference { return b.term.Ref() }
+func (b *BoundTransform) Type() Type { return
b.transform.ResultType(b.term.Type()) }
+
+func (b *BoundTransform) Equals(other BoundTerm) bool {
+ rhs, ok := other.(*BoundTransform)
+ if !ok {
+ return false
+ }
+
+ return b.transform.Equals(rhs.transform) && b.term.Equals(rhs.term)
+}
+
+func (b *BoundTransform) evalToLiteral(st structLike) Optional[Literal] {
+ return b.transform.Apply(b.term.evalToLiteral(st))
+}
+
+func (b *BoundTransform) evalIsNull(st structLike) bool {
+ return !b.evalToLiteral(st).Valid
+}
diff --git a/go.mod b/go.mod
index 49dafc3..8a344c5 100644
--- a/go.mod
+++ b/go.mod
@@ -32,6 +32,7 @@ require (
github.com/hamba/avro/v2 v2.23.0
github.com/pterm/pterm v0.12.79
github.com/stretchr/testify v1.9.0
+ github.com/twmb/murmur3 v1.1.8
github.com/wolfeidau/s3iofs v1.5.2
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
)
diff --git a/go.sum b/go.sum
index 33f11eb..5fd4ae1 100644
--- a/go.sum
+++ b/go.sum
@@ -130,6 +130,8 @@ github.com/stretchr/testify v1.6.1/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0
h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg=
+github.com/twmb/murmur3 v1.1.8/go.mod
h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/wolfeidau/s3iofs v1.5.2
h1:2dmzSxdrSY29GsILVheJqBbURVQX3KglggSBtVWCYj4=
github.com/wolfeidau/s3iofs v1.5.2/go.mod
h1:fPAKzdWmZ1Z2L9vnqL6d1eb7pVsUgkUstxQUkG1HIHA=
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778/go.mod
h1:2MuV+tbUrU1zIOPMxZ5EncGwgmMJsa+9ucAQZXxsObs=
diff --git a/io/s3.go b/io/s3.go
index 6702080..7396130 100644
--- a/io/s3.go
+++ b/io/s3.go
@@ -64,9 +64,13 @@ func createS3FileIO(parsed *url.URL, props
map[string]string) (IO, error) {
}
if region, ok := props[S3Region]; ok {
- opts = append(opts, func(o *s3.Options) { o.Region = region })
+ opts = append(opts, func(o *s3.Options) {
+ o.Region = region
+ })
} else if region, ok := props["client.region"]; ok {
- opts = append(opts, func(o *s3.Options) { o.Region = region })
+ opts = append(opts, func(o *s3.Options) {
+ o.Region = region
+ })
}
accessKey, secretAccessKey := props[S3AccessKeyID],
props[S3SecretAccessKey]
diff --git a/manifest.go b/manifest.go
index 57465f1..9a0d8b7 100644
--- a/manifest.go
+++ b/manifest.go
@@ -932,8 +932,6 @@ type DataFile interface {
// SortOrderID returns the id representing the sort order for this
// file, or nil if there is no sort order.
SortOrderID() *int
-
- setFieldNameToIDMap(map[string]int)
}
// ManifestEntry is an interface for both v1 and v2 manifest entries.
diff --git a/table/evaluators.go b/table/evaluators.go
index 343c8ef..1458c98 100644
--- a/table/evaluators.go
+++ b/table/evaluators.go
@@ -18,6 +18,10 @@
package table
import (
+ "fmt"
+ "math"
+ "slices"
+
"github.com/apache/iceberg-go"
"github.com/google/uuid"
)
@@ -61,6 +65,45 @@ func (m *manifestEvalVisitor) Eval(manifest
iceberg.ManifestFile) (bool, error)
return rowsMightMatch, nil
}
+func removeBoundCmp[T iceberg.LiteralType](bound iceberg.Literal, vals
[]iceberg.Literal, cmpToDelete int) []iceberg.Literal {
+ val := bound.(iceberg.TypedLiteral[T])
+ cmp := val.Comparator()
+
+ return slices.DeleteFunc(vals, func(v iceberg.Literal) bool {
+ return cmp(val.Value(), v.(iceberg.TypedLiteral[T]).Value()) ==
cmpToDelete
+ })
+}
+
+func removeBoundCheck(bound iceberg.Literal, vals []iceberg.Literal, toDelete
int) []iceberg.Literal {
+ switch bound.Type().(type) {
+ case iceberg.BooleanType:
+ return removeBoundCmp[bool](bound, vals, toDelete)
+ case iceberg.Int32Type:
+ return removeBoundCmp[int32](bound, vals, toDelete)
+ case iceberg.Int64Type:
+ return removeBoundCmp[int64](bound, vals, toDelete)
+ case iceberg.Float32Type:
+ return removeBoundCmp[float32](bound, vals, toDelete)
+ case iceberg.Float64Type:
+ return removeBoundCmp[float64](bound, vals, toDelete)
+ case iceberg.DateType:
+ return removeBoundCmp[iceberg.Date](bound, vals, toDelete)
+ case iceberg.TimeType:
+ return removeBoundCmp[iceberg.Time](bound, vals, toDelete)
+ case iceberg.TimestampType, iceberg.TimestampTzType:
+ return removeBoundCmp[iceberg.Timestamp](bound, vals, toDelete)
+ case iceberg.BinaryType, iceberg.FixedType:
+ return removeBoundCmp[[]byte](bound, vals, toDelete)
+ case iceberg.StringType:
+ return removeBoundCmp[string](bound, vals, toDelete)
+ case iceberg.UUIDType:
+ return removeBoundCmp[uuid.UUID](bound, vals, toDelete)
+ case iceberg.DecimalType:
+ return removeBoundCmp[iceberg.Decimal](bound, vals, toDelete)
+ }
+ panic("unrecognized type")
+}
+
func allBoundCmp[T iceberg.LiteralType](bound iceberg.Literal, set
iceberg.Set[iceberg.Literal], want int) bool {
val := bound.(iceberg.TypedLiteral[T])
cmp := val.Comparator()
@@ -488,3 +531,595 @@ func (m *manifestEvalVisitor) VisitBound(pred
iceberg.BoundPredicate) bool {
func (m *manifestEvalVisitor) VisitNot(child bool) bool { return !child }
func (m *manifestEvalVisitor) VisitAnd(left, right bool) bool { return left &&
right }
func (m *manifestEvalVisitor) VisitOr(left, right bool) bool { return left ||
right }
+
+type projectionEvaluator struct {
+ spec iceberg.PartitionSpec
+ schema *iceberg.Schema
+ caseSensitive bool
+}
+
+func (*projectionEvaluator) VisitTrue() iceberg.BooleanExpression { return
iceberg.AlwaysTrue{} }
+func (*projectionEvaluator) VisitFalse() iceberg.BooleanExpression { return
iceberg.AlwaysFalse{} }
+func (*projectionEvaluator) VisitNot(child iceberg.BooleanExpression)
iceberg.BooleanExpression {
+ panic(fmt.Errorf("%w: cannot project 'not' expression, should be
rewritten %s",
+ iceberg.ErrInvalidArgument, child))
+}
+
+func (*projectionEvaluator) VisitAnd(left, right iceberg.BooleanExpression)
iceberg.BooleanExpression {
+ return iceberg.NewAnd(left, right)
+}
+
+func (*projectionEvaluator) VisitOr(left, right iceberg.BooleanExpression)
iceberg.BooleanExpression {
+ return iceberg.NewOr(left, right)
+}
+
+func (*projectionEvaluator) VisitUnbound(pred iceberg.UnboundPredicate)
iceberg.BooleanExpression {
+ panic(fmt.Errorf("%w: cannot project unbound predicate: %s",
iceberg.ErrInvalidArgument, pred))
+}
+
+type inclusiveProjection struct{ projectionEvaluator }
+
+func (p *inclusiveProjection) Project(expr iceberg.BooleanExpression)
(iceberg.BooleanExpression, error) {
+ expr, err := iceberg.RewriteNotExpr(expr)
+ if err != nil {
+ return nil, err
+ }
+
+ bound, err := iceberg.BindExpr(p.schema, expr, p.caseSensitive)
+ if err != nil {
+ return nil, err
+ }
+
+ return iceberg.VisitExpr(bound, p)
+}
+
+func (p *inclusiveProjection) VisitBound(pred iceberg.BoundPredicate)
iceberg.BooleanExpression {
+ parts := p.spec.FieldsBySourceID(pred.Term().Ref().Field().ID)
+
+ var result iceberg.BooleanExpression = iceberg.AlwaysTrue{}
+ for _, part := range parts {
+ // consider (d = 2019-01-01) with bucket(7, d) and bucket(5, d)
+ // projections: b1 = bucket(7, '2019-01-01') = 5, b2 =
bucket(5, '2019-01-01') = 0
+ // any value where b1 != 5 or any value where b2 != 0 cannot be
the '2019-01-01'
+ //
+ // similarly, if partitioning by day(ts) and hour(ts), the more
restrictive
+ // projection should be used. ts = 2019-01-01T01:00:00 produces
day=2019-01-01 and
+ // hour=2019-01-01-01. the value will be in 2019-01-01-01 and
not in 2019-01-01-02.
+ inclProjection, err := part.Transform.Project(part.Name, pred)
+ if err != nil {
+ panic(err)
+ }
+ if inclProjection != nil {
+ result = iceberg.NewAnd(result, inclProjection)
+ }
+ }
+
+ return result
+}
+
+func newInclusiveProjection(s *iceberg.Schema, spec iceberg.PartitionSpec,
caseSensitive bool) func(iceberg.BooleanExpression) (iceberg.BooleanExpression,
error) {
+ return (&inclusiveProjection{
+ projectionEvaluator: projectionEvaluator{
+ schema: s,
+ spec: spec,
+ caseSensitive: caseSensitive,
+ },
+ }).Project
+}
+
+type metricsEvaluator struct {
+ valueCounts map[int]int64
+ nullCounts map[int]int64
+ nanCounts map[int]int64
+ lowerBounds map[int][]byte
+ upperBounds map[int][]byte
+}
+
+func (m *metricsEvaluator) VisitTrue() bool { return rowsMightMatch }
+func (m *metricsEvaluator) VisitFalse() bool { return rowsCannotMatch }
+func (m *metricsEvaluator) VisitNot(child bool) bool {
+ panic(fmt.Errorf("%w: NOT should be rewritten %v",
iceberg.ErrInvalidArgument, child))
+}
+func (m *metricsEvaluator) VisitAnd(left, right bool) bool { return left &&
right }
+func (m *metricsEvaluator) VisitOr(left, right bool) bool { return left ||
right }
+
+func (m *metricsEvaluator) containsNullsOnly(id int) bool {
+ valCount, ok := m.valueCounts[id]
+ if !ok {
+ return false
+ }
+
+ nullCount, ok := m.nullCounts[id]
+ if !ok {
+ return false
+ }
+
+ return valCount == nullCount
+}
+
+func (m *metricsEvaluator) containsNansOnly(id int) bool {
+ nanCount, ok := m.nanCounts[id]
+ if !ok {
+ return false
+ }
+
+ valCount, ok := m.valueCounts[id]
+ if !ok {
+ return false
+ }
+
+ return nanCount == valCount
+}
+
+func (m *metricsEvaluator) isNan(v iceberg.Literal) bool {
+ switch v := v.(type) {
+ case iceberg.Float32Literal:
+ return math.IsNaN(float64(v))
+ case iceberg.Float64Literal:
+ return math.IsNaN(float64(v))
+ default:
+ return false
+ }
+}
+
+func newInclusiveMetricsEvaluator(s *iceberg.Schema, expr
iceberg.BooleanExpression,
+ caseSensitive bool, includeEmptyFiles bool) (func(iceberg.DataFile)
(bool, error), error) {
+
+ rewritten, err := iceberg.RewriteNotExpr(expr)
+ if err != nil {
+ return nil, err
+ }
+
+ bound, err := iceberg.BindExpr(s, rewritten, caseSensitive)
+ if err != nil {
+ return nil, err
+ }
+
+ return (&inclusiveMetricsEval{
+ st: s.AsStruct(),
+ includeEmptyFiles: includeEmptyFiles,
+ expr: bound,
+ }).Eval, nil
+}
+
+type inclusiveMetricsEval struct {
+ metricsEvaluator
+
+ st iceberg.StructType
+ expr iceberg.BooleanExpression
+ includeEmptyFiles bool
+}
+
+func (m *inclusiveMetricsEval) Eval(file iceberg.DataFile) (bool, error) {
+ if !m.includeEmptyFiles && file.Count() == 0 {
+ return rowsCannotMatch, nil
+ }
+
+ m.valueCounts, m.nullCounts = file.ValueCounts(), file.NullValueCounts()
+ m.nanCounts = file.NaNValueCounts()
+ m.lowerBounds, m.upperBounds = file.LowerBoundValues(),
file.UpperBoundValues()
+
+ return iceberg.VisitExpr(m.expr, m)
+}
+
+func (m *inclusiveMetricsEval) mayContainNull(fieldID int) bool {
+ if m.nullCounts == nil {
+ return true
+ }
+
+ _, ok := m.nullCounts[fieldID]
+ return ok
+}
+
+func (m *inclusiveMetricsEval) VisitUnbound(iceberg.UnboundPredicate) bool {
+ panic("need bound predicate")
+}
+
+func (m *inclusiveMetricsEval) VisitBound(pred iceberg.BoundPredicate) bool {
+ return iceberg.VisitBoundPredicate(pred, m)
+}
+
+func (m *inclusiveMetricsEval) VisitIsNull(t iceberg.BoundTerm) bool {
+ fieldID := t.Ref().Field().ID
+ if cnt, exists := m.nullCounts[fieldID]; exists && cnt == 0 {
+ return rowsCannotMatch
+ }
+ return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitNotNull(t iceberg.BoundTerm) bool {
+ // no need to check whether the field is required because binding
evaluates
+ // that case if the column has no non-null values, the expression
cannot match
+ fieldID := t.Ref().Field().ID
+ if m.containsNullsOnly(fieldID) {
+ return rowsCannotMatch
+ }
+ return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitIsNan(t iceberg.BoundTerm) bool {
+ fieldID := t.Ref().Field().ID
+ if cnt, exists := m.nanCounts[fieldID]; exists && cnt == 0 {
+ return rowsCannotMatch
+ }
+ // when there's no nancounts information but we already know the column
+ // contains null it's guaranteed that there's no nan value
+ if m.containsNullsOnly(fieldID) {
+ return rowsCannotMatch
+ }
+ return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitNotNan(t iceberg.BoundTerm) bool {
+ fieldID := t.Ref().Field().ID
+
+ if m.containsNansOnly(fieldID) {
+ return rowsCannotMatch
+ }
+ return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitLess(t iceberg.BoundTerm, lit
iceberg.Literal) bool {
+ field := t.Ref().Field()
+ fieldID := field.ID
+
+ if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) {
+ return rowsCannotMatch
+ }
+
+ if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+ panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s",
+ iceberg.ErrInvalidTypeString, field.Type))
+ }
+
+ if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil {
+ lowerBound, err := iceberg.LiteralFromBytes(field.Type,
lowerBoundBytes)
+ if err != nil {
+ panic(err)
+ }
+
+ if m.isNan(lowerBound) {
+ // nan indicates unreliable bounds
+ return rowsMightMatch
+ }
+
+ if getCmpLiteral(lowerBound)(lowerBound, lit) >= 0 {
+ return rowsCannotMatch
+ }
+ }
+
+ return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitLessEqual(t iceberg.BoundTerm, lit
iceberg.Literal) bool {
+ field := t.Ref().Field()
+ fieldID := field.ID
+
+ if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) {
+ return rowsCannotMatch
+ }
+
+ if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+ panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s",
+ iceberg.ErrInvalidTypeString, field.Type))
+ }
+
+ if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil {
+ lowerBound, err := iceberg.LiteralFromBytes(field.Type,
lowerBoundBytes)
+ if err != nil {
+ panic(err)
+ }
+
+ if m.isNan(lowerBound) {
+ // nan indicates unreliable bounds
+ return rowsMightMatch
+ }
+
+ if getCmpLiteral(lowerBound)(lowerBound, lit) > 0 {
+ return rowsCannotMatch
+ }
+ }
+
+ return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitGreater(t iceberg.BoundTerm, lit
iceberg.Literal) bool {
+ field := t.Ref().Field()
+ fieldID := field.ID
+
+ if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) {
+ return rowsCannotMatch
+ }
+
+ if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+ panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s",
+ iceberg.ErrInvalidTypeString, field.Type))
+ }
+
+ if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil {
+ upperBound, err := iceberg.LiteralFromBytes(field.Type,
upperBoundBytes)
+ if err != nil {
+ panic(err)
+ }
+
+ if getCmpLiteral(upperBound)(upperBound, lit) <= 0 {
+ if m.isNan(upperBound) {
+ return rowsMightMatch
+ }
+
+ return rowsCannotMatch
+ }
+ }
+
+ return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitGreaterEqual(t iceberg.BoundTerm, lit
iceberg.Literal) bool {
+ field := t.Ref().Field()
+ fieldID := field.ID
+
+ if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) {
+ return rowsCannotMatch
+ }
+
+ if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+ panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s",
+ iceberg.ErrInvalidTypeString, field.Type))
+ }
+
+ if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil {
+ upperBound, err := iceberg.LiteralFromBytes(field.Type,
upperBoundBytes)
+ if err != nil {
+ panic(err)
+ }
+
+ if getCmpLiteral(upperBound)(upperBound, lit) < 0 {
+ if m.isNan(upperBound) {
+ return rowsMightMatch
+ }
+
+ return rowsCannotMatch
+ }
+ }
+
+ return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitEqual(t iceberg.BoundTerm, lit
iceberg.Literal) bool {
+ field := t.Ref().Field()
+ fieldID := field.ID
+
+ if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) {
+ return rowsCannotMatch
+ }
+
+ if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+ panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s",
+ iceberg.ErrInvalidTypeString, field.Type))
+ }
+
+ var cmp func(iceberg.Literal, iceberg.Literal) int
+ if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil {
+ lowerBound, err := iceberg.LiteralFromBytes(field.Type,
lowerBoundBytes)
+ if err != nil {
+ panic(err)
+ }
+
+ if m.isNan(lowerBound) {
+ return rowsMightMatch
+ }
+
+ cmp = getCmpLiteral(lowerBound)
+ if cmp(lowerBound, lit) == 1 {
+ return rowsCannotMatch
+ }
+ }
+
+ if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil {
+ upperBound, err := iceberg.LiteralFromBytes(field.Type,
upperBoundBytes)
+ if err != nil {
+ panic(err)
+ }
+
+ if m.isNan(upperBound) {
+ return rowsMightMatch
+ }
+
+ if cmp(upperBound, lit) == -1 {
+ return rowsCannotMatch
+ }
+ }
+
+ return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitNotEqual(iceberg.BoundTerm,
iceberg.Literal) bool {
+ return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitIn(t iceberg.BoundTerm, s
iceberg.Set[iceberg.Literal]) bool {
+ field := t.Ref().Field()
+ fieldID := field.ID
+
+ if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) {
+ return rowsCannotMatch
+ }
+
+ if s.Len() > inPredicateLimit {
+ // skip evaluating the predicate if the number of values is too
big
+ return rowsMightMatch
+ }
+
+ if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+ panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s",
+ iceberg.ErrInvalidTypeString, field.Type))
+ }
+
+ values := s.Members()
+ if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil {
+ lowerBound, err := iceberg.LiteralFromBytes(field.Type,
lowerBoundBytes)
+ if err != nil {
+ panic(lowerBound)
+ }
+
+ if m.isNan(lowerBound) {
+ return rowsMightMatch
+ }
+
+ values = removeBoundCheck(lowerBound, values, 1)
+ if len(values) == 0 {
+ return rowsCannotMatch
+ }
+ }
+
+ if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil {
+ upperBound, err := iceberg.LiteralFromBytes(field.Type,
upperBoundBytes)
+ if err != nil {
+ panic(err)
+ }
+
+ if m.isNan(upperBound) {
+ return rowsMightMatch
+ }
+
+ values = removeBoundCheck(upperBound, values, -1)
+ if len(values) == 0 {
+ return rowsCannotMatch
+ }
+ }
+
+ return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitNotIn(iceberg.BoundTerm,
iceberg.Set[iceberg.Literal]) bool {
+ // because the bounds are not necessarily a min or max value, this
cannot be
+ // answered using them. notIn(col, {X, ...}) with (XX, Y) doesn't
guarantee that
+ // X is a value in col
+ return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitStartsWith(t iceberg.BoundTerm, lit
iceberg.Literal) bool {
+ field := t.Ref().Field()
+ fieldID := field.ID
+
+ if m.containsNullsOnly(fieldID) {
+ return rowsCannotMatch
+ }
+
+ if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+ panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s",
+ iceberg.ErrInvalidTypeString, field.Type))
+ }
+
+ var prefix string
+ if val, ok := lit.(iceberg.TypedLiteral[string]); ok {
+ prefix = val.Value()
+ } else {
+ prefix = string(lit.(iceberg.TypedLiteral[[]byte]).Value())
+ }
+
+ lenPrefix := len(prefix)
+
+ if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil {
+ lowerBound, err := iceberg.LiteralFromBytes(field.Type,
lowerBoundBytes)
+ if err != nil {
+ panic(err)
+ }
+
+ var v string
+ switch l := lowerBound.(type) {
+ case iceberg.TypedLiteral[string]:
+ v = l.Value()
+ case iceberg.TypedLiteral[[]byte]:
+ v = string(l.Value())
+ }
+
+ if len(v) > lenPrefix {
+ v = v[:lenPrefix]
+ }
+
+ if len(v) > 0 && v > prefix {
+ return rowsCannotMatch
+ }
+ }
+
+ if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil {
+ upperBound, err := iceberg.LiteralFromBytes(field.Type,
upperBoundBytes)
+ if err != nil {
+ panic(err)
+ }
+
+ var v string
+ switch u := upperBound.(type) {
+ case iceberg.TypedLiteral[string]:
+ v = u.Value()
+ case iceberg.TypedLiteral[[]byte]:
+ v = string(u.Value())
+ }
+
+ if len(v) > lenPrefix {
+ v = v[:lenPrefix]
+ }
+
+ if len(v) > 0 && v < prefix {
+ return rowsCannotMatch
+ }
+ }
+
+ return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitNotStartsWith(t iceberg.BoundTerm, lit
iceberg.Literal) bool {
+ field := t.Ref().Field()
+ fieldID := field.ID
+
+ if m.mayContainNull(fieldID) {
+ return rowsMightMatch
+ }
+
+ if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+ panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s",
+ iceberg.ErrInvalidTypeString, field.Type))
+ }
+
+ // not_starts_with will match unless all values must start with the
prefix.
+ // this happens when the lower and upper bounds both start with the
prefix
+ lowerBoundBytes, upperBoundBytes := m.lowerBounds[fieldID],
m.upperBounds[fieldID]
+ if lowerBoundBytes != nil && upperBoundBytes != nil {
+ lowerBound, err := iceberg.LiteralFromBytes(field.Type,
lowerBoundBytes)
+ if err != nil {
+ panic(err)
+ }
+
+ upperBound, err := iceberg.LiteralFromBytes(field.Type,
upperBoundBytes)
+ if err != nil {
+ panic(err)
+ }
+
+ var prefix, lower, upper string
+ if val, ok := lit.(iceberg.TypedLiteral[string]); ok {
+ prefix = val.Value()
+ lower, upper =
lowerBound.(iceberg.TypedLiteral[string]).Value(),
upperBound.(iceberg.TypedLiteral[string]).Value()
+ } else {
+ prefix =
string(lit.(iceberg.TypedLiteral[[]byte]).Value())
+ lower, upper =
string(lowerBound.(iceberg.TypedLiteral[[]byte]).Value()),
string(upperBound.(iceberg.TypedLiteral[[]byte]).Value())
+ }
+
+ lenPrefix := len(prefix)
+ if len(lower) < lenPrefix {
+ return rowsMightMatch
+ }
+
+ if lower[:lenPrefix] == prefix {
+ if len(upper) < lenPrefix {
+ return rowsMightMatch
+ }
+
+ if upper[:lenPrefix] == prefix {
+ return rowsCannotMatch
+ }
+ }
+ }
+
+ return rowsMightMatch
+}
diff --git a/table/evaluators_test.go b/table/evaluators_test.go
index b8c2671..a543e93 100644
--- a/table/evaluators_test.go
+++ b/table/evaluators_test.go
@@ -18,11 +18,13 @@
package table
import (
+ "math"
"testing"
"github.com/apache/iceberg-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "github.com/stretchr/testify/suite"
)
const (
@@ -503,3 +505,1295 @@ func TestManifestEvaluator(t *testing.T) {
}
})
}
+
+type ProjectionTestSuite struct {
+ suite.Suite
+}
+
+func (*ProjectionTestSuite) schema() *iceberg.Schema {
+ return iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64},
+ iceberg.NestedField{ID: 2, Name: "data", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 3, Name: "event_date", Type:
iceberg.PrimitiveTypes.Date},
+ iceberg.NestedField{ID: 4, Name: "event_ts", Type:
iceberg.PrimitiveTypes.Timestamp},
+ )
+}
+
+func (*ProjectionTestSuite) emptySpec() iceberg.PartitionSpec {
+ return iceberg.NewPartitionSpec()
+}
+
+func (*ProjectionTestSuite) idSpec() iceberg.PartitionSpec {
+ return iceberg.NewPartitionSpec(
+ iceberg.PartitionField{SourceID: 1, FieldID: 1000,
+ Transform: iceberg.IdentityTransform{}, Name:
"id_part"},
+ )
+}
+
+func (*ProjectionTestSuite) bucketSpec() iceberg.PartitionSpec {
+ return iceberg.NewPartitionSpec(
+ iceberg.PartitionField{SourceID: 2, FieldID: 1000,
+ Transform: iceberg.BucketTransform{NumBuckets: 16},
Name: "data_bucket"},
+ )
+}
+
+func (*ProjectionTestSuite) daySpec() iceberg.PartitionSpec {
+ return iceberg.NewPartitionSpec(
+ iceberg.PartitionField{SourceID: 4, FieldID: 1000,
+ Transform: iceberg.DayTransform{}, Name: "date"},
+ iceberg.PartitionField{SourceID: 3, FieldID: 1001,
+ Transform: iceberg.DayTransform{}, Name: "ddate"},
+ )
+}
+
+func (*ProjectionTestSuite) hourSpec() iceberg.PartitionSpec {
+ return iceberg.NewPartitionSpec(
+ iceberg.PartitionField{SourceID: 4, FieldID: 1000,
+ Transform: iceberg.HourTransform{}, Name: "hour"},
+ )
+}
+
+func (*ProjectionTestSuite) truncateStrSpec() iceberg.PartitionSpec {
+ return iceberg.NewPartitionSpec(
+ iceberg.PartitionField{SourceID: 2, FieldID: 1000,
+ Transform: iceberg.TruncateTransform{Width: 2}, Name:
"data_trunc"},
+ )
+}
+
+func (*ProjectionTestSuite) truncateIntSpec() iceberg.PartitionSpec {
+ return iceberg.NewPartitionSpec(
+ iceberg.PartitionField{SourceID: 1, FieldID: 1000,
+ Transform: iceberg.TruncateTransform{Width: 10}, Name:
"id_trunc"},
+ )
+}
+
+func (*ProjectionTestSuite) idAndBucketSpec() iceberg.PartitionSpec {
+ return iceberg.NewPartitionSpec(
+ iceberg.PartitionField{SourceID: 1, FieldID: 1000,
+ Transform: iceberg.IdentityTransform{}, Name:
"id_part"},
+ iceberg.PartitionField{SourceID: 2, FieldID: 1001,
+ Transform: iceberg.BucketTransform{NumBuckets: 16},
Name: "data_bucket"},
+ )
+}
+
+func (p *ProjectionTestSuite) TestIdentityProjection() {
+ schema, spec := p.schema(), p.idSpec()
+
+ idRef, idPartRef := iceberg.Reference("id"),
iceberg.Reference("id_part")
+ tests := []struct {
+ pred iceberg.BooleanExpression
+ expected iceberg.BooleanExpression
+ }{
+ {iceberg.NotNull(idRef), iceberg.NotNull(idPartRef)},
+ {iceberg.IsNull(idRef), iceberg.IsNull(idPartRef)},
+ {iceberg.LessThan(idRef, int64(100)),
iceberg.LessThan(idPartRef, int64(100))},
+ {iceberg.LessThanEqual(idRef, int64(101)),
iceberg.LessThanEqual(idPartRef, int64(101))},
+ {iceberg.GreaterThan(idRef, int64(102)),
iceberg.GreaterThan(idPartRef, int64(102))},
+ {iceberg.GreaterThanEqual(idRef, int64(103)),
iceberg.GreaterThanEqual(idPartRef, int64(103))},
+ {iceberg.EqualTo(idRef, int64(104)), iceberg.EqualTo(idPartRef,
int64(104))},
+ {iceberg.NotEqualTo(idRef, int64(105)),
iceberg.NotEqualTo(idPartRef, int64(105))},
+ {iceberg.IsIn(idRef, int64(3), 4, 5), iceberg.IsIn(idPartRef,
int64(3), 4, 5)},
+ {iceberg.NotIn(idRef, int64(3), 4, 5), iceberg.NotIn(idPartRef,
int64(3), 4, 5)},
+ }
+
+ project := newInclusiveProjection(schema, spec, true)
+ for _, tt := range tests {
+ p.Run(tt.pred.String(), func() {
+ expr, err := project(tt.pred)
+ p.Require().NoError(err)
+ p.Truef(tt.expected.Equals(expr), "expected: %s\ngot:
%s", tt.expected, expr)
+ })
+ }
+}
+
+func (p *ProjectionTestSuite) TestBucketProjection() {
+ schema, spec := p.schema(), p.bucketSpec()
+
+ dataRef, dataBkt := iceberg.Reference("data"),
iceberg.Reference("data_bucket")
+ tests := []struct {
+ pred, expected iceberg.BooleanExpression
+ }{
+ {iceberg.NotNull(dataRef), iceberg.NotNull(dataBkt)},
+ {iceberg.IsNull(dataRef), iceberg.IsNull(dataBkt)},
+ {iceberg.LessThan(dataRef, "val"), iceberg.AlwaysTrue{}},
+ {iceberg.LessThanEqual(dataRef, "val"), iceberg.AlwaysTrue{}},
+ {iceberg.GreaterThan(dataRef, "val"), iceberg.AlwaysTrue{}},
+ {iceberg.GreaterThanEqual(dataRef, "val"),
iceberg.AlwaysTrue{}},
+ {iceberg.EqualTo(dataRef, "val"), iceberg.EqualTo(dataBkt,
int32(14))},
+ {iceberg.NotEqualTo(dataRef, "val"), iceberg.AlwaysTrue{}},
+ {iceberg.IsIn(dataRef, "v1", "v2", "v3"), iceberg.IsIn(dataBkt,
int32(1), 3, 13)},
+ {iceberg.NotIn(dataRef, "v1", "v2", "v3"),
iceberg.AlwaysTrue{}},
+ }
+
+ project := newInclusiveProjection(schema, spec, true)
+ for _, tt := range tests {
+ p.Run(tt.pred.String(), func() {
+ expr, err := project(tt.pred)
+ p.Require().NoError(err)
+ p.Truef(tt.expected.Equals(expr), "expected: %s\ngot:
%s", tt.expected, expr)
+ })
+ }
+}
+
+func (p *ProjectionTestSuite) TestHourProjection() {
+ schema, spec := p.schema(), p.hourSpec()
+
+ ref, hour := iceberg.Reference("event_ts"), iceberg.Reference("hour")
+ tests := []struct {
+ pred, expected iceberg.BooleanExpression
+ }{
+ {iceberg.NotNull(ref), iceberg.NotNull(hour)},
+ {iceberg.IsNull(ref), iceberg.IsNull(hour)},
+ {iceberg.LessThan(ref, "2022-11-27T10:00:00"),
iceberg.LessThanEqual(hour, int32(463761))},
+ {iceberg.LessThanEqual(ref, "2022-11-27T10:00:00"),
iceberg.LessThanEqual(hour, int32(463762))},
+ {iceberg.GreaterThan(ref, "2022-11-27T09:59:59.999999"),
iceberg.GreaterThanEqual(hour, int32(463762))},
+ {iceberg.GreaterThanEqual(ref, "2022-11-27T09:59:59.999999"),
iceberg.GreaterThanEqual(hour, int32(463761))},
+ {iceberg.EqualTo(ref, "2022-11-27T10:00:00"),
iceberg.EqualTo(hour, int32(463762))},
+ {iceberg.NotEqualTo(ref, "2022-11-27T10:00:00"),
iceberg.AlwaysTrue{}},
+ {iceberg.IsIn(ref, "2022-11-27T10:00:00",
"2022-11-27T09:59:59.999999"), iceberg.IsIn(hour, int32(463761), 463762)},
+ {iceberg.NotIn(ref, "2022-11-27T10:00:00",
"2022-11-27T09:59:59.999999"), iceberg.AlwaysTrue{}},
+ }
+
+ project := newInclusiveProjection(schema, spec, true)
+ for _, tt := range tests {
+ p.Run(tt.pred.String(), func() {
+ expr, err := project(tt.pred)
+ p.Require().NoError(err)
+ p.Truef(tt.expected.Equals(expr), "expected: %s\ngot:
%s", tt.expected, expr)
+ })
+ }
+}
+
+func (p *ProjectionTestSuite) TestDayProjection() {
+ schema, spec := p.schema(), p.daySpec()
+
+ ref, date := iceberg.Reference("event_ts"), iceberg.Reference("date")
+ tests := []struct {
+ pred, expected iceberg.BooleanExpression
+ }{
+ {iceberg.NotNull(ref), iceberg.NotNull(date)},
+ {iceberg.IsNull(ref), iceberg.IsNull(date)},
+ {iceberg.LessThan(ref, "2022-11-27T00:00:00"),
iceberg.LessThanEqual(date, int32(19322))},
+ {iceberg.LessThanEqual(ref, "2022-11-27T00:00:00"),
iceberg.LessThanEqual(date, int32(19323))},
+ {iceberg.GreaterThan(ref, "2022-11-26T23:59:59.999999"),
iceberg.GreaterThanEqual(date, int32(19323))},
+ {iceberg.GreaterThanEqual(ref, "2022-11-26T23:59:59.999999"),
iceberg.GreaterThanEqual(date, int32(19322))},
+ {iceberg.EqualTo(ref, "2022-11-27T10:00:00"),
iceberg.EqualTo(date, int32(19323))},
+ {iceberg.NotEqualTo(ref, "2022-11-27T10:00:00"),
iceberg.AlwaysTrue{}},
+ {iceberg.IsIn(ref, "2022-11-27T00:00:00",
"2022-11-26T23:59:59.999999"), iceberg.IsIn(date, int32(19322), 19323)},
+ {iceberg.NotIn(ref, "2022-11-27T00:00:00",
"2022-11-26T23:59:59.999999"), iceberg.AlwaysTrue{}},
+ }
+
+ project := newInclusiveProjection(schema, spec, true)
+ for _, tt := range tests {
+ p.Run(tt.pred.String(), func() {
+ expr, err := project(tt.pred)
+ p.Require().NoError(err)
+ p.Truef(tt.expected.Equals(expr), "expected: %s\ngot:
%s", tt.expected, expr)
+ })
+ }
+}
+
+func (p *ProjectionTestSuite) TestDateDayProjection() {
+ schema, spec := p.schema(), p.daySpec()
+
+ ref, date := iceberg.Reference("event_date"), iceberg.Reference("ddate")
+ tests := []struct {
+ pred, expected iceberg.BooleanExpression
+ }{
+ {iceberg.NotNull(ref), iceberg.NotNull(date)},
+ {iceberg.IsNull(ref), iceberg.IsNull(date)},
+ {iceberg.LessThan(ref, "2022-11-27"),
iceberg.LessThanEqual(date, int32(19322))},
+ {iceberg.LessThanEqual(ref, "2022-11-27"),
iceberg.LessThanEqual(date, int32(19323))},
+ {iceberg.GreaterThan(ref, "2022-11-26"),
iceberg.GreaterThanEqual(date, int32(19323))},
+ {iceberg.GreaterThanEqual(ref, "2022-11-26"),
iceberg.GreaterThanEqual(date, int32(19322))},
+ {iceberg.EqualTo(ref, "2022-11-27"), iceberg.EqualTo(date,
int32(19323))},
+ {iceberg.NotEqualTo(ref, "2022-11-27"), iceberg.AlwaysTrue{}},
+ {iceberg.IsIn(ref, "2022-11-27", "2022-11-26"),
iceberg.IsIn(date, int32(19322), 19323)},
+ {iceberg.NotIn(ref, "2022-11-27", "2022-11-26"),
iceberg.AlwaysTrue{}},
+ }
+
+ project := newInclusiveProjection(schema, spec, true)
+ for _, tt := range tests {
+ p.Run(tt.pred.String(), func() {
+ expr, err := project(tt.pred)
+ p.Require().NoError(err)
+ p.Truef(tt.expected.Equals(expr), "expected: %s\ngot:
%s", tt.expected, expr)
+ })
+ }
+}
+
+func (p *ProjectionTestSuite) TestStringTruncateProjection() {
+ schema, spec := p.schema(), p.truncateStrSpec()
+
+ ref, truncStr := iceberg.Reference("data"),
iceberg.Reference("data_trunc")
+ tests := []struct {
+ pred, expected iceberg.BooleanExpression
+ }{
+ {iceberg.NotNull(ref), iceberg.NotNull(truncStr)},
+ {iceberg.IsNull(ref), iceberg.IsNull(truncStr)},
+ {iceberg.LessThan(ref, "aaa"), iceberg.LessThanEqual(truncStr,
"aa")},
+ {iceberg.LessThanEqual(ref, "aaa"),
iceberg.LessThanEqual(truncStr, "aa")},
+ {iceberg.GreaterThan(ref, "aaa"),
iceberg.GreaterThanEqual(truncStr, "aa")},
+ {iceberg.GreaterThanEqual(ref, "aaa"),
iceberg.GreaterThanEqual(truncStr, "aa")},
+ {iceberg.EqualTo(ref, "aaa"), iceberg.EqualTo(truncStr, "aa")},
+ {iceberg.NotEqualTo(ref, "aaa"), iceberg.AlwaysTrue{}},
+ {iceberg.IsIn(ref, "aaa", "aab"), iceberg.EqualTo(truncStr,
"aa")},
+ {iceberg.NotIn(ref, "aaa", "aab"), iceberg.AlwaysTrue{}},
+ }
+
+ project := newInclusiveProjection(schema, spec, true)
+ for _, tt := range tests {
+ p.Run(tt.pred.String(), func() {
+ expr, err := project(tt.pred)
+ p.Require().NoError(err)
+ p.Truef(tt.expected.Equals(expr), "expected: %s\ngot:
%s", tt.expected, expr)
+ })
+ }
+}
+
+func (p *ProjectionTestSuite) TestIntTruncateProjection() {
+ schema, spec := p.schema(), p.truncateIntSpec()
+
+ ref, idTrunc := iceberg.Reference("id"), iceberg.Reference("id_trunc")
+ tests := []struct {
+ pred, expected iceberg.BooleanExpression
+ }{
+ {iceberg.NotNull(ref), iceberg.NotNull(idTrunc)},
+ {iceberg.IsNull(ref), iceberg.IsNull(idTrunc)},
+ {iceberg.LessThan(ref, int32(10)),
iceberg.LessThanEqual(idTrunc, int64(0))},
+ {iceberg.LessThanEqual(ref, int32(10)),
iceberg.LessThanEqual(idTrunc, int64(10))},
+ {iceberg.GreaterThan(ref, int32(9)),
iceberg.GreaterThanEqual(idTrunc, int64(10))},
+ {iceberg.GreaterThanEqual(ref, int32(10)),
iceberg.GreaterThanEqual(idTrunc, int64(10))},
+ {iceberg.EqualTo(ref, int32(15)), iceberg.EqualTo(idTrunc,
int64(10))},
+ {iceberg.NotEqualTo(ref, int32(15)), iceberg.AlwaysTrue{}},
+ {iceberg.IsIn(ref, int32(15), 16), iceberg.EqualTo(idTrunc,
int64(10))},
+ {iceberg.NotIn(ref, int32(15), 16), iceberg.AlwaysTrue{}},
+ }
+
+ project := newInclusiveProjection(schema, spec, true)
+ for _, tt := range tests {
+ p.Run(tt.pred.String(), func() {
+ expr, err := project(tt.pred)
+ p.Require().NoError(err)
+ p.Truef(tt.expected.Equals(expr), "expected: %s\ngot:
%s", tt.expected, expr)
+ })
+ }
+}
+
+func (p *ProjectionTestSuite) TestProjectionCaseSensitive() {
+ schema, spec := p.schema(), p.idSpec()
+ project := newInclusiveProjection(schema, spec, true)
+ _, err := project(iceberg.NotNull(iceberg.Reference("ID")))
+ p.ErrorIs(err, iceberg.ErrInvalidSchema)
+ p.ErrorContains(err, "could not bind reference 'ID',
caseSensitive=true")
+}
+
+func (p *ProjectionTestSuite) TestProjectionCaseInsensitive() {
+ schema, spec := p.schema(), p.idSpec()
+ project := newInclusiveProjection(schema, spec, false)
+ expr, err := project(iceberg.NotNull(iceberg.Reference("ID")))
+ p.Require().NoError(err)
+ p.True(expr.Equals(iceberg.NotNull(iceberg.Reference("id_part"))))
+}
+
+func (p *ProjectionTestSuite) TestProjectEmptySpec() {
+ project := newInclusiveProjection(p.schema(), p.emptySpec(), true)
+ expr, err :=
project(iceberg.NewAnd(iceberg.LessThan(iceberg.Reference("id"), int32(5)),
+ iceberg.NotNull(iceberg.Reference("data"))))
+ p.Require().NoError(err)
+ p.Equal(iceberg.AlwaysTrue{}, expr)
+}
+
+func (p *ProjectionTestSuite) TestAndProjectionMultipleFields() {
+ project := newInclusiveProjection(p.schema(), p.idAndBucketSpec(), true)
+ expr, err :=
project(iceberg.NewAnd(iceberg.LessThan(iceberg.Reference("id"),
+ int32(5)), iceberg.IsIn(iceberg.Reference("data"), "a", "b",
"c")))
+ p.Require().NoError(err)
+
+
p.True(expr.Equals(iceberg.NewAnd(iceberg.LessThan(iceberg.Reference("id_part"),
int64(5)),
+ iceberg.IsIn(iceberg.Reference("data_bucket"), int32(2), 3,
15))))
+}
+
+func (p *ProjectionTestSuite) TestOrProjectionMultipleFields() {
+ project := newInclusiveProjection(p.schema(), p.idAndBucketSpec(), true)
+ expr, err :=
project(iceberg.NewOr(iceberg.LessThan(iceberg.Reference("id"), int32(5)),
+ iceberg.IsIn(iceberg.Reference("data"), "a", "b", "c")))
+ p.Require().NoError(err)
+
+
p.True(expr.Equals(iceberg.NewOr(iceberg.LessThan(iceberg.Reference("id_part"),
int64(5)),
+ iceberg.IsIn(iceberg.Reference("data_bucket"), int32(2), 3,
15))))
+}
+
+func (p *ProjectionTestSuite) TestNotProjectionMultipleFields() {
+ project := newInclusiveProjection(p.schema(), p.idAndBucketSpec(), true)
+ // not causes In to be rewritten to NotIn, which cannot be projected
+ expr, err :=
project(iceberg.NewNot(iceberg.NewOr(iceberg.LessThan(iceberg.Reference("id"),
int64(5)),
+ iceberg.IsIn(iceberg.Reference("data"), "a", "b", "c"))))
+ p.Require().NoError(err)
+
+
p.True(expr.Equals(iceberg.GreaterThanEqual(iceberg.Reference("id_part"),
int64(5))))
+}
+
+func (p *ProjectionTestSuite) TestPartialProjectedFields() {
+ project := newInclusiveProjection(p.schema(), p.idSpec(), true)
+ expr, err :=
project(iceberg.NewAnd(iceberg.LessThan(iceberg.Reference("id"), int32(5)),
+ iceberg.IsIn(iceberg.Reference("data"), "a", "b", "c")))
+ p.Require().NoError(err)
+ p.True(expr.Equals(iceberg.LessThan(iceberg.Reference("id_part"),
int64(5))))
+}
+
+type mockDataFile struct {
+ path string
+ format iceberg.FileFormat
+ partition map[string]any
+ count int64
+ columnSizes map[int]int64
+ filesize int64
+ valueCounts map[int]int64
+ nullCounts map[int]int64
+ nanCounts map[int]int64
+ lowerBounds map[int][]byte
+ upperBounds map[int][]byte
+}
+
+func (*mockDataFile) ContentType() iceberg.ManifestEntryContent { return
iceberg.EntryContentData }
+func (m *mockDataFile) FilePath() string { return
m.path }
+func (m *mockDataFile) FileFormat() iceberg.FileFormat { return
m.format }
+func (m *mockDataFile) Partition() map[string]any { return
m.partition }
+func (m *mockDataFile) Count() int64 { return
m.count }
+func (m *mockDataFile) FileSizeBytes() int64 { return
m.filesize }
+func (m *mockDataFile) ColumnSizes() map[int]int64 { return
m.columnSizes }
+func (m *mockDataFile) ValueCounts() map[int]int64 { return
m.valueCounts }
+func (m *mockDataFile) NullValueCounts() map[int]int64 { return
m.nullCounts }
+func (m *mockDataFile) NaNValueCounts() map[int]int64 { return
m.nanCounts }
+func (*mockDataFile) DistinctValueCounts() map[int]int64 { return nil }
+func (m *mockDataFile) LowerBoundValues() map[int][]byte { return
m.lowerBounds }
+func (m *mockDataFile) UpperBoundValues() map[int][]byte { return
m.upperBounds }
+func (*mockDataFile) KeyMetadata() []byte { return nil }
+func (*mockDataFile) SplitOffsets() []int64 { return nil }
+func (*mockDataFile) EqualityFieldIDs() []int { return nil }
+func (*mockDataFile) SortOrderID() *int { return nil }
+
+type InclusiveMetricsTestSuite struct {
+ suite.Suite
+
+ schemaDataFile *iceberg.Schema
+ dataFiles [4]iceberg.DataFile
+
+ schemaDataFileNan *iceberg.Schema
+ dataFileNan iceberg.DataFile
+}
+
+func (suite *InclusiveMetricsTestSuite) SetupSuite() {
+ suite.schemaDataFile = iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 2, Name: "no_stats", Type:
iceberg.PrimitiveTypes.Int32, Required: false},
+ iceberg.NestedField{ID: 3, Name: "required", Type:
iceberg.PrimitiveTypes.String, Required: true},
+ iceberg.NestedField{ID: 4, Name: "all_nulls", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 5, Name: "some_nulls", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 6, Name: "no_nulls", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 7, Name: "all_nans", Type:
iceberg.PrimitiveTypes.Float64},
+ iceberg.NestedField{ID: 8, Name: "some_nans", Type:
iceberg.PrimitiveTypes.Float32},
+ iceberg.NestedField{ID: 9, Name: "no_nans", Type:
iceberg.PrimitiveTypes.Float32},
+ iceberg.NestedField{ID: 10, Name: "all_nulls_double", Type:
iceberg.PrimitiveTypes.Float64},
+ iceberg.NestedField{ID: 11, Name: "all_nans_v1_stats", Type:
iceberg.PrimitiveTypes.Float32},
+ iceberg.NestedField{ID: 12, Name: "nan_and_null_only", Type:
iceberg.PrimitiveTypes.Float64},
+ iceberg.NestedField{ID: 13, Name: "no_nan_stats", Type:
iceberg.PrimitiveTypes.Float64},
+ iceberg.NestedField{ID: 14, Name: "some_empty", Type:
iceberg.PrimitiveTypes.String},
+ )
+
+ var (
+ IntMin, _ = iceberg.Int32Literal(IntMinValue).MarshalBinary()
+ IntMax, _ = iceberg.Int32Literal(IntMaxValue).MarshalBinary()
+ FltNan, _ =
iceberg.Float32Literal(float32(math.NaN())).MarshalBinary()
+ DblNan, _ = iceberg.Float64Literal(math.NaN()).MarshalBinary()
+ FltSeven, _ = iceberg.Float32Literal(7).MarshalBinary()
+ DblSeven, _ = iceberg.Float64Literal(7).MarshalBinary()
+ FltMax, _ = iceberg.Float32Literal(22).MarshalBinary()
+ )
+
+ suite.dataFiles = [4]iceberg.DataFile{
+ &mockDataFile{
+ path: "file_1.parquet",
+ format: iceberg.ParquetFile,
+ count: 50,
+ filesize: 3,
+ valueCounts: map[int]int64{
+ 4: 50, 5: 50, 6: 50, 7: 50, 8: 50, 9: 50,
+ 10: 50, 11: 50, 12: 50, 13: 50, 14: 50,
+ },
+ nullCounts: map[int]int64{4: 50, 5: 10, 6: 0, 10: 50,
11: 0, 12: 1, 14: 8},
+ nanCounts: map[int]int64{7: 50, 8: 10, 9: 0},
+ lowerBounds: map[int][]byte{
+ 1: IntMin,
+ 11: FltNan,
+ 12: DblNan,
+ 14: {},
+ },
+ upperBounds: map[int][]byte{
+ 1: IntMax,
+ 11: FltNan,
+ 12: DblNan,
+ 14: []byte("房东整租霍营小区二层两居室"),
+ },
+ },
+ &mockDataFile{
+ path: "file_2.parquet",
+ format: iceberg.ParquetFile,
+ count: 50,
+ filesize: 3,
+ valueCounts: map[int]int64{3: 20},
+ nullCounts: map[int]int64{3: 2},
+ nanCounts: nil,
+ lowerBounds: map[int][]byte{3: {'a', 'a'}},
+ upperBounds: map[int][]byte{3: {'d', 'C'}},
+ },
+ &mockDataFile{
+ path: "file_3.parquet",
+ format: iceberg.ParquetFile,
+ count: 50,
+ filesize: 3,
+ valueCounts: map[int]int64{3: 20},
+ nullCounts: map[int]int64{3: 2},
+ nanCounts: nil,
+ lowerBounds: map[int][]byte{3: []byte("1str1")},
+ upperBounds: map[int][]byte{3: []byte("3str3")},
+ },
+ &mockDataFile{
+ path: "file_4.parquet",
+ format: iceberg.ParquetFile,
+ count: 50,
+ filesize: 3,
+ valueCounts: map[int]int64{3: 20},
+ nullCounts: map[int]int64{3: 2},
+ nanCounts: nil,
+ lowerBounds: map[int][]byte{3: []byte("abc")},
+ upperBounds: map[int][]byte{3: []byte("イロハニホヘト")},
+ },
+ }
+
+ suite.schemaDataFileNan = iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "all_nan", Type:
iceberg.PrimitiveTypes.Float64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "max_nan", Type:
iceberg.PrimitiveTypes.Float64, Required: true},
+ iceberg.NestedField{ID: 3, Name: "min_max_nan", Type:
iceberg.PrimitiveTypes.Float32},
+ iceberg.NestedField{ID: 4, Name: "all_nan_null_bounds", Type:
iceberg.PrimitiveTypes.Float64, Required: true},
+ iceberg.NestedField{ID: 5, Name: "some_nan_correct_bounds",
Type: iceberg.PrimitiveTypes.Float32},
+ )
+
+ suite.dataFileNan = &mockDataFile{
+ path: "file.avro",
+ format: iceberg.AvroFile,
+ count: 50,
+ filesize: 3,
+ columnSizes: map[int]int64{1: 10, 2: 10, 3: 10, 4: 10, 5: 10},
+ valueCounts: map[int]int64{1: 10, 2: 10, 3: 10, 4: 10, 5: 10},
+ nullCounts: map[int]int64{1: 0, 2: 0, 3: 0, 4: 0, 5: 0},
+ nanCounts: map[int]int64{1: 10, 4: 10, 5: 5},
+ lowerBounds: map[int][]byte{
+ 1: DblNan,
+ 2: DblSeven,
+ 3: FltNan,
+ 5: FltSeven,
+ },
+ upperBounds: map[int][]byte{
+ 1: DblNan,
+ 2: DblNan,
+ 3: FltNan,
+ 5: FltMax,
+ },
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestAllNull() {
+ allNull, someNull, noNull := iceberg.Reference("all_nulls"),
iceberg.Reference("some_nulls"), iceberg.Reference("no_nulls")
+
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.NotNull(allNull), false, "should skip: no non-null
value in all null column"},
+ {iceberg.LessThan(allNull, "a"), false, "should skip: lessThan
on all null column"},
+ {iceberg.LessThanEqual(allNull, "a"), false, "should skip:
lessThanEqual on all null column"},
+ {iceberg.GreaterThan(allNull, "a"), false, "should skip:
greaterThan on all null column"},
+ {iceberg.GreaterThanEqual(allNull, "a"), false, "should skip:
greaterThanEqual on all null column"},
+ {iceberg.EqualTo(allNull, "a"), false, "should skip: equal on
all null column"},
+ {iceberg.NotNull(someNull), true, "should read: column with
some nulls contains a non-null value"},
+ {iceberg.NotNull(noNull), true, "should read: non-null column
contains a non-null value"},
+ {iceberg.StartsWith(allNull, "asad"), false, "should skip:
starts with on all null column"},
+ {iceberg.NotStartsWith(allNull, "asad"), true, "should read:
notStartsWith on all null column"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestNoNulls() {
+ allNull, someNull, noNull := iceberg.Reference("all_nulls"),
iceberg.Reference("some_nulls"), iceberg.Reference("no_nulls")
+
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.IsNull(allNull), true, "should read: at least one null
value in all null column"},
+ {iceberg.IsNull(someNull), true, "should read: column with some
nulls contains a null value"},
+ {iceberg.IsNull(noNull), false, "should skip: non-null column
contains no null values"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIsNan() {
+ allNan, someNan, noNan := iceberg.Reference("all_nans"),
iceberg.Reference("some_nans"), iceberg.Reference("no_nans")
+ allNullsDbl, noNanStats := iceberg.Reference("all_nulls_double"),
iceberg.Reference("no_nan_stats")
+ allNansV1, nanNullOnly := iceberg.Reference("all_nans_v1_stats"),
iceberg.Reference("nan_and_null_only")
+
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.IsNaN(allNan), true, "should read: at least one nan
value in all nan column"},
+ {iceberg.IsNaN(someNan), true, "should read: at least one nan
value in some nan column"},
+ {iceberg.IsNaN(noNan), false, "should skip: no-nans column has
no nans"},
+ {iceberg.IsNaN(allNullsDbl), false, "should skip: all-null
column doesn't contain nan values"},
+ {iceberg.IsNaN(noNanStats), true, "should read: no guarantee if
contains nan without stats"},
+ {iceberg.IsNaN(allNansV1), true, "should read: at least one nan
value in all nan column"},
+ {iceberg.IsNaN(nanNullOnly), true, "should read: at least one
nan value in nan and nulls only column"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestNotNaN() {
+ allNan, someNan, noNan := iceberg.Reference("all_nans"),
iceberg.Reference("some_nans"), iceberg.Reference("no_nans")
+ allNullsDbl, noNanStats := iceberg.Reference("all_nulls_double"),
iceberg.Reference("no_nan_stats")
+ allNansV1, nanNullOnly := iceberg.Reference("all_nans_v1_stats"),
iceberg.Reference("nan_and_null_only")
+
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.NotNaN(allNan), false, "should skip: column with all
nans will not contain non-nan"},
+ {iceberg.NotNaN(someNan), true, "should read: at least one
non-nan value in some nan column"},
+ {iceberg.NotNaN(noNan), true, "should read: at least one
non-nan value in no nan column"},
+ {iceberg.NotNaN(allNullsDbl), true, "should read: at least one
non-nan value in all null column"},
+ {iceberg.NotNaN(noNanStats), true, "should read: no guarantee
if contains nan without stats"},
+ {iceberg.NotNaN(allNansV1), true, "should read: no guarantee"},
+ {iceberg.NotNaN(nanNullOnly), true, "should read: at least one
null value in nan and nulls only column"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestRequiredColumn() {
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.NotNull(iceberg.Reference("required")), true, "should
read: required columns are always non-null"},
+ {iceberg.IsNull(iceberg.Reference("required")), false, "should
skip: required columns are always non-null"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestMissingColumn() {
+ _, err := newInclusiveMetricsEvaluator(suite.schemaDataFile,
iceberg.LessThan(iceberg.Reference("missing"), int32(22)), true, true)
+ suite.ErrorIs(err, iceberg.ErrInvalidSchema)
+}
+
+func (suite *InclusiveMetricsTestSuite) TestMissingStats() {
+ noStatsSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 2, Name: "no_stats", Type:
iceberg.PrimitiveTypes.Float64})
+
+ noStatsFile := &mockDataFile{
+ path: "file_1.parquet",
+ format: iceberg.ParquetFile,
+ count: 50,
+ }
+
+ ref := iceberg.Reference("no_stats")
+ tests := []iceberg.BooleanExpression{
+ iceberg.LessThan(ref, int32(5)),
+ iceberg.LessThanEqual(ref, int32(30)),
+ iceberg.EqualTo(ref, int32(70)),
+ iceberg.GreaterThan(ref, int32(78)),
+ iceberg.GreaterThanEqual(ref, int32(90)),
+ iceberg.NotEqualTo(ref, int32(101)),
+ iceberg.IsNull(ref),
+ iceberg.NotNull(ref),
+ iceberg.IsNaN(ref),
+ iceberg.NotNaN(ref),
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(noStatsSchema, tt, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(noStatsFile)
+ suite.Require().NoError(err)
+ suite.True(shouldRead, "should read when stats are
missing")
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestZeroRecordFileStats() {
+ zeroRecordFile := &mockDataFile{
+ path: "file_1.parquet",
+ format: iceberg.ParquetFile,
+ count: 0,
+ }
+
+ ref := iceberg.Reference("no_stats")
+ tests := []iceberg.BooleanExpression{
+ iceberg.LessThan(ref, int32(5)),
+ iceberg.LessThanEqual(ref, int32(30)),
+ iceberg.EqualTo(ref, int32(70)),
+ iceberg.GreaterThan(ref, int32(78)),
+ iceberg.GreaterThanEqual(ref, int32(90)),
+ iceberg.NotEqualTo(ref, int32(101)),
+ iceberg.IsNull(ref),
+ iceberg.NotNull(ref),
+ iceberg.IsNaN(ref),
+ iceberg.NotNaN(ref),
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt, true, false)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(zeroRecordFile)
+ suite.Require().NoError(err)
+ suite.False(shouldRead, "should skip datafile without
records")
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestNot() {
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.NewNot(iceberg.LessThan(iceberg.Reference("id"),
IntMinValue-25)), true, "should read: not(false)"},
+ {iceberg.NewNot(iceberg.GreaterThan(iceberg.Reference("id"),
IntMinValue-25)), false, "should skip: not(true)"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestAnd() {
+ ref := iceberg.Reference("id")
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.NewAnd(
+ iceberg.LessThan(ref, IntMinValue-25),
+ iceberg.GreaterThanEqual(ref, IntMinValue-30)), false,
"should skip: and(false, true)"},
+ {iceberg.NewAnd(
+ iceberg.LessThan(ref, IntMinValue-25),
+ iceberg.GreaterThanEqual(ref, IntMinValue+1)), false,
"should skip: and(false, false)"},
+ {iceberg.NewAnd(
+ iceberg.GreaterThan(ref, IntMinValue-25),
+ iceberg.LessThanEqual(ref, IntMinValue)), true, "should
read: and(true, true)"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestOr() {
+ ref := iceberg.Reference("id")
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.NewOr(
+ iceberg.LessThan(ref, IntMinValue-25),
+ iceberg.GreaterThanEqual(ref, IntMaxValue+1)), false,
"should skip: or(false, false)"},
+ {iceberg.NewOr(
+ iceberg.LessThan(ref, IntMinValue-25),
+ iceberg.GreaterThanEqual(ref, IntMaxValue-19)), true,
"should read: or(false, true)"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIntLt() {
+ ref := iceberg.Reference("id")
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.LessThan(ref, IntMinValue-25), false, "should skip: id
range below lower bound (5 < 30)"},
+ {iceberg.LessThan(ref, IntMinValue), false, "should skip: id
range below lower bound (30 is not < 30)"},
+ {iceberg.LessThan(ref, IntMinValue+1), true, "should read: one
possible id"},
+ {iceberg.LessThan(ref, IntMaxValue), true, "should read: many
possible ids"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIntLtEq() {
+ ref := iceberg.Reference("id")
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.LessThanEqual(ref, IntMinValue-25), false, "should
skip: id range below lower bound (5 < 30)"},
+ {iceberg.LessThanEqual(ref, IntMinValue-1), false, "should
skip: id range below lower bound (29 is not <= 30)"},
+ {iceberg.LessThanEqual(ref, IntMinValue), true, "should read:
one possible id"},
+ {iceberg.LessThanEqual(ref, IntMaxValue), true, "should read:
many possible ids"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIntGt() {
+ ref := iceberg.Reference("id")
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.GreaterThan(ref, IntMaxValue+6), false, "should skip:
id range above upper bound (85 > 79)"},
+ {iceberg.GreaterThan(ref, IntMaxValue), false, "should skip: id
range above upper bound (79 is not > 79)"},
+ {iceberg.GreaterThan(ref, IntMinValue-1), true, "should read:
one possible id"},
+ {iceberg.GreaterThan(ref, IntMaxValue-4), true, "should read:
many possible ids"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIntGtEq() {
+ ref := iceberg.Reference("id")
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.GreaterThanEqual(ref, IntMaxValue+6), false, "should
skip: id range above upper bound (85 < 79)"},
+ {iceberg.GreaterThanEqual(ref, IntMaxValue+1), false, "should
skip: id range above upper bound (80 > 79)"},
+ {iceberg.GreaterThanEqual(ref, IntMaxValue), true, "should
read: one possible id"},
+ {iceberg.GreaterThanEqual(ref, IntMaxValue-4), true, "should
read: many possible ids"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIntEq() {
+ ref := iceberg.Reference("id")
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.EqualTo(ref, IntMinValue-25), false, "should skip: id
range below lower bound"},
+ {iceberg.EqualTo(ref, IntMinValue-1), false, "should skip: id
range below lower bound"},
+ {iceberg.EqualTo(ref, IntMinValue), true, "should read: id
equal to lower bound"},
+ {iceberg.EqualTo(ref, IntMaxValue-4), true, "should read: id
between lower and upper bounds"},
+ {iceberg.EqualTo(ref, IntMaxValue), true, "should read: id
equal to upper bound"},
+ {iceberg.EqualTo(ref, IntMaxValue+1), false, "should skip: id
above upper bound"},
+ {iceberg.EqualTo(ref, IntMaxValue+6), false, "should skip: id
above upper bound"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIntNeq() {
+ ref := iceberg.Reference("id")
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.NotEqualTo(ref, IntMinValue-25), true, "should read:
id range below lower bound"},
+ {iceberg.NotEqualTo(ref, IntMinValue-1), true, "should read: id
range below lower bound"},
+ {iceberg.NotEqualTo(ref, IntMinValue), true, "should read: id
equal to lower bound"},
+ {iceberg.NotEqualTo(ref, IntMaxValue-4), true, "should read: id
between lower and upper bounds"},
+ {iceberg.NotEqualTo(ref, IntMaxValue), true, "should read: id
equal to upper bound"},
+ {iceberg.NotEqualTo(ref, IntMaxValue+1), true, "should read: id
above upper bound"},
+ {iceberg.NotEqualTo(ref, IntMaxValue+6), true, "should read: id
above upper bound"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIntNeqRewritten() {
+ ref := iceberg.Reference("id")
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.EqualTo(ref, IntMinValue-25), true, "should read: id
range below lower bound"},
+ {iceberg.EqualTo(ref, IntMinValue-1), true, "should read: id
range below lower bound"},
+ {iceberg.EqualTo(ref, IntMinValue), true, "should read: id
equal to lower bound"},
+ {iceberg.EqualTo(ref, IntMaxValue-4), true, "should read: id
between lower and upper bounds"},
+ {iceberg.EqualTo(ref, IntMaxValue), true, "should read: id
equal to upper bound"},
+ {iceberg.EqualTo(ref, IntMaxValue+1), true, "should read: id
above upper bound"},
+ {iceberg.EqualTo(ref, IntMaxValue+6), true, "should read: id
above upper bound"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, iceberg.NewNot(tt.expr),
true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIntNeqRewrittenCaseInsensitive() {
+ ref := iceberg.Reference("ID")
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.EqualTo(ref, IntMinValue-25), true, "should read: id
range below lower bound"},
+ {iceberg.EqualTo(ref, IntMinValue-1), true, "should read: id
range below lower bound"},
+ {iceberg.EqualTo(ref, IntMinValue), true, "should read: id
equal to lower bound"},
+ {iceberg.EqualTo(ref, IntMaxValue-4), true, "should read: id
between lower and upper bounds"},
+ {iceberg.EqualTo(ref, IntMaxValue), true, "should read: id
equal to upper bound"},
+ {iceberg.EqualTo(ref, IntMaxValue+1), true, "should read: id
above upper bound"},
+ {iceberg.EqualTo(ref, IntMaxValue+6), true, "should read: id
above upper bound"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, iceberg.NewNot(tt.expr),
false, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestInMetrics() {
+ ref := iceberg.Reference("id")
+
+ ids := make([]int32, 400)
+ for i := range ids {
+ ids[i] = int32(i)
+ }
+
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.IsIn(ref, IntMinValue-25, IntMinValue-24), false,
"should skip: id below lower bound"},
+ {iceberg.IsIn(ref, IntMinValue-2, IntMinValue-1), false,
"should skip: id below lower bound"},
+ {iceberg.IsIn(ref, IntMinValue-1, IntMinValue), true, "should
read: id equal to lower bound"},
+ {iceberg.IsIn(ref, IntMaxValue-4, IntMaxValue-3), true, "should
read: id between upper and lower bounds"},
+ {iceberg.IsIn(ref, IntMaxValue, IntMaxValue+1), true, "should
read: id equal to upper bound"},
+ {iceberg.IsIn(ref, IntMaxValue+1, IntMaxValue+2), false,
"should skip: id above upper bound"},
+ {iceberg.IsIn(ref, IntMaxValue+6, IntMaxValue+7), false,
"should skip: id above upper bound"},
+ {iceberg.IsIn(iceberg.Reference("all_nulls"), "abc", "def"),
false, "should skip: in on all nulls column"},
+ {iceberg.IsIn(iceberg.Reference("some_nulls"), "abc", "def"),
true, "should read: in on some nulls column"},
+ {iceberg.IsIn(iceberg.Reference("no_nulls"), "abc", "def"),
true, "should read: in on no nulls column"},
+ {iceberg.IsIn(ref, ids...), true, "should read: large in
expression"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestNotInMetrics() {
+ ref := iceberg.Reference("id")
+
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.NotIn(ref, IntMinValue-25, IntMinValue-24), true,
"should read: id below lower bound"},
+ {iceberg.NotIn(ref, IntMinValue-2, IntMinValue-1), true,
"should read: id below lower bound"},
+ {iceberg.NotIn(ref, IntMinValue-1, IntMinValue), true, "should
read: id equal to lower bound"},
+ {iceberg.NotIn(ref, IntMaxValue-4, IntMaxValue-3), true,
"should read: id between upper and lower bounds"},
+ {iceberg.NotIn(ref, IntMaxValue, IntMaxValue+1), true, "should
read: id equal to upper bound"},
+ {iceberg.NotIn(ref, IntMaxValue+1, IntMaxValue+2), true,
"should read: id above upper bound"},
+ {iceberg.NotIn(ref, IntMaxValue+6, IntMaxValue+7), true,
"should read: id above upper bound"},
+ {iceberg.NotIn(iceberg.Reference("all_nulls"), "abc", "def"),
true, "should read: in on all nulls column"},
+ {iceberg.NotIn(iceberg.Reference("some_nulls"), "abc", "def"),
true, "should read: in on some nulls column"},
+ {iceberg.NotIn(iceberg.Reference("no_nulls"), "abc", "def"),
true, "should read: in on no nulls column"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFiles[0])
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestLessAndLessEqualNans() {
+ type Op func(iceberg.UnboundTerm, int32) iceberg.UnboundPredicate
+ for _, operator := range []Op{iceberg.LessThan[int32],
iceberg.LessThanEqual[int32]} {
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {operator(iceberg.Reference("all_nan"), int32(1)),
false, "should skip: all nan column doesn't contain number"},
+ {operator(iceberg.Reference("max_nan"), int32(1)),
false, "should skip: 1 is smaller than lower bound"},
+ {operator(iceberg.Reference("max_nan"), int32(10)),
true, "should read: 10 is larger than lower bound"},
+ {operator(iceberg.Reference("min_max_nan"), int32(1)),
true, "should read: no visibility"},
+ {operator(iceberg.Reference("all_nan_null_bounds"),
int32(1)), false, "should skip: all nan column doesn't contain number"},
+ {operator(iceberg.Reference("some_nan_correct_bounds"),
int32(1)), false, "should skip: 1 is smaller than lower bound"},
+ {operator(iceberg.Reference("some_nan_correct_bounds"),
int32(10)), true, "should read: 10 is larger than lower bound"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFileNan, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFileNan)
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestGreaterAndGreaterEqualNans() {
+ type Op func(iceberg.UnboundTerm, int32) iceberg.UnboundPredicate
+ for _, operator := range []Op{iceberg.GreaterThan[int32],
iceberg.GreaterThanEqual[int32]} {
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {operator(iceberg.Reference("all_nan"), int32(1)),
false, "should skip: all nan column doesn't contain number"},
+ {operator(iceberg.Reference("max_nan"), int32(1)),
true, "should read: upper bound is larger than 1"},
+ {operator(iceberg.Reference("max_nan"), int32(10)),
true, "should read: 10 is smaller than upper bound"},
+ {operator(iceberg.Reference("min_max_nan"), int32(1)),
true, "should read: no visibility"},
+ {operator(iceberg.Reference("all_nan_null_bounds"),
int32(1)), false, "should skip: all nan column doesn't contain number"},
+ {operator(iceberg.Reference("some_nan_correct_bounds"),
int32(1)), true, "should read: 1 is smaller than upper bound"},
+ {operator(iceberg.Reference("some_nan_correct_bounds"),
int32(10)), true, "should read: 10 is smaller than upper bound"},
+ {operator(iceberg.Reference("all_nan"), int32(30)),
false, "should skip: 30 is larger than upper bound"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFileNan, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFileNan)
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestEqualsNans() {
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.EqualTo(iceberg.Reference("all_nan"), int32(1)),
false, "should skip: all nan column doesn't contain number"},
+ {iceberg.EqualTo(iceberg.Reference("max_nan"), int32(1)),
false, "should skip: 1 is smaller than lower bound"},
+ {iceberg.EqualTo(iceberg.Reference("max_nan"), int32(10)),
true, "should read: 10 is within bounds"},
+ {iceberg.EqualTo(iceberg.Reference("min_max_nan"), int32(1)),
true, "should read: no visibility"},
+ {iceberg.EqualTo(iceberg.Reference("all_nan_null_bounds"),
int32(1)), false, "should skip: all nan column doesn't contain number"},
+ {iceberg.EqualTo(iceberg.Reference("some_nan_correct_bounds"),
int32(1)), false, "should skip: 1 is smaller than lower bound"},
+ {iceberg.EqualTo(iceberg.Reference("some_nan_correct_bounds"),
int32(10)), true, "should read: 10 within bounds"},
+ {iceberg.EqualTo(iceberg.Reference("all_nan"), int32(30)),
false, "should skip: 30 is larger than upper bound"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFileNan, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFileNan)
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestNotEqualsNans() {
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.NotEqualTo(iceberg.Reference("all_nan"), int32(1)),
true, "should read: no visibility"},
+ {iceberg.NotEqualTo(iceberg.Reference("max_nan"), int32(1)),
true, "should read: no visibility"},
+ {iceberg.NotEqualTo(iceberg.Reference("max_nan"), int32(10)),
true, "should read: no visibility"},
+ {iceberg.NotEqualTo(iceberg.Reference("min_max_nan"),
int32(1)), true, "should read: no visibility"},
+ {iceberg.NotEqualTo(iceberg.Reference("all_nan_null_bounds"),
int32(1)), true, "should read: no visibility"},
+
{iceberg.NotEqualTo(iceberg.Reference("some_nan_correct_bounds"), int32(1)),
true, "should read: no visibility"},
+
{iceberg.NotEqualTo(iceberg.Reference("some_nan_correct_bounds"), int32(10)),
true, "should read: no visibility"},
+ {iceberg.NotEqualTo(iceberg.Reference("all_nan"), int32(30)),
true, "should read: no visibility"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFileNan, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFileNan)
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestInWithNans() {
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.IsIn(iceberg.Reference("all_nan"), int32(1), 10, 30),
false, "should skip: all nan column doesn't contain number"},
+ {iceberg.IsIn(iceberg.Reference("max_nan"), int32(1), 10, 30),
true, "should read: 10 and 30 are greater than lower bound"},
+ {iceberg.IsIn(iceberg.Reference("min_max_nan"), int32(1), 10,
30), true, "should read: no visibility"},
+ {iceberg.IsIn(iceberg.Reference("all_nan_null_bounds"),
int32(1), 10, 30), false, "should skip: all nan column doesn't contain number"},
+ {iceberg.IsIn(iceberg.Reference("some_nan_correct_bounds"),
int32(1), 10, 30), true, "should read: 10 within bounds"},
+ {iceberg.IsIn(iceberg.Reference("some_nan_correct_bounds"),
int32(1), 30), false, "should skip: 1 and 30 not within bounds"},
+ {iceberg.IsIn(iceberg.Reference("some_nan_correct_bounds"),
int32(5), 7), true, "should read: overlap with lower bound"},
+ {iceberg.IsIn(iceberg.Reference("some_nan_correct_bounds"),
int32(22), 25), true, "should read: overlap with upper bound"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFileNan, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFileNan)
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestNotInWithNans() {
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ msg string
+ }{
+ {iceberg.NotIn(iceberg.Reference("all_nan"), int32(1), 10, 30),
true, "should read: no visibility"},
+ {iceberg.NotIn(iceberg.Reference("max_nan"), int32(1), 10, 30),
true, "should read: no visibility"},
+ {iceberg.NotIn(iceberg.Reference("min_max_nan"), int32(1), 10,
30), true, "should read: no visibility"},
+ {iceberg.NotIn(iceberg.Reference("all_nan_null_bounds"),
int32(1), 10, 30), true, "should read: no visibility"},
+ {iceberg.NotIn(iceberg.Reference("some_nan_correct_bounds"),
int32(1), 10, 30), true, "should read: no visibility"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFileNan, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(suite.dataFileNan)
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestStartsWith() {
+ ref, refEmpty := iceberg.Reference("required"),
iceberg.Reference("some_empty")
+
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ dataFile iceberg.DataFile
+ msg string
+ }{
+ {iceberg.StartsWith(ref, "a"), true, suite.dataFiles[0],
"should read: no stats"},
+ {iceberg.StartsWith(ref, "a"), true, suite.dataFiles[1],
"should read: range matches"},
+ {iceberg.StartsWith(ref, "aa"), true, suite.dataFiles[1],
"should read: range matches"},
+ {iceberg.StartsWith(ref, "aaa"), true, suite.dataFiles[1],
"should read: range matches"},
+ {iceberg.StartsWith(ref, "1s"), true, suite.dataFiles[2],
"should read: range matches"},
+ {iceberg.StartsWith(ref, "1str1x"), true, suite.dataFiles[2],
"should read: range matches"},
+ {iceberg.StartsWith(ref, "ff"), true, suite.dataFiles[3],
"should read: range matches"},
+ {iceberg.StartsWith(ref, "aB"), false, suite.dataFiles[1],
"should skip: range doesn't match"},
+ {iceberg.StartsWith(ref, "dWx"), false, suite.dataFiles[1],
"should skip: range doesn't match"},
+ {iceberg.StartsWith(ref, "5"), false, suite.dataFiles[2],
"should skip: range doesn't match"},
+ {iceberg.StartsWith(ref, "3str3x"), false, suite.dataFiles[2],
"should skip: range doesn't match"},
+ {iceberg.StartsWith(refEmpty, "房东整租霍"), true,
suite.dataFiles[0], "should read: range matches"},
+ {iceberg.StartsWith(iceberg.Reference("all_nulls"), ""), false,
suite.dataFiles[0], "should skip: range doesn't match"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(tt.dataFile)
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestNotStartsWith() {
+ ref, refEmpty := iceberg.Reference("required"),
iceberg.Reference("some_empty")
+
+ tests := []struct {
+ expr iceberg.BooleanExpression
+ expected bool
+ dataFile iceberg.DataFile
+ msg string
+ }{
+ {iceberg.NotStartsWith(ref, "a"), true, suite.dataFiles[0],
"should read: no stats"},
+ {iceberg.NotStartsWith(ref, "a"), true, suite.dataFiles[1],
"should read: range matches"},
+ {iceberg.NotStartsWith(ref, "aa"), true, suite.dataFiles[1],
"should read: range matches"},
+ {iceberg.NotStartsWith(ref, "aaa"), true, suite.dataFiles[1],
"should read: range matches"},
+ {iceberg.NotStartsWith(ref, "1s"), true, suite.dataFiles[2],
"should read: range matches"},
+ {iceberg.NotStartsWith(ref, "1str1x"), true,
suite.dataFiles[2], "should read: range matches"},
+ {iceberg.NotStartsWith(ref, "ff"), true, suite.dataFiles[3],
"should read: range matches"},
+ {iceberg.NotStartsWith(ref, "aB"), true, suite.dataFiles[1],
"should read: range doesn't match"},
+ {iceberg.NotStartsWith(ref, "dWx"), true, suite.dataFiles[1],
"should read: range doesn't match"},
+ {iceberg.NotStartsWith(ref, "5"), true, suite.dataFiles[2],
"should read: range doesn't match"},
+ {iceberg.NotStartsWith(ref, "3str3x"), true,
suite.dataFiles[2], "should read: range doesn't match"},
+ {iceberg.NotStartsWith(refEmpty, "房东整租霍"), true,
suite.dataFiles[0], "should read: range matches"},
+ }
+
+ for _, tt := range tests {
+ suite.Run(tt.expr.String(), func() {
+ eval, err :=
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+ suite.Require().NoError(err)
+ shouldRead, err := eval(tt.dataFile)
+ suite.Require().NoError(err)
+ suite.Equal(tt.expected, shouldRead, tt.msg)
+ })
+ }
+}
+
+func TestEvaluators(t *testing.T) {
+ suite.Run(t, &ProjectionTestSuite{})
+ suite.Run(t, &InclusiveMetricsTestSuite{})
+}
diff --git a/table/scanner.go b/table/scanner.go
new file mode 100644
index 0000000..4417fc1
--- /dev/null
+++ b/table/scanner.go
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "cmp"
+ "context"
+ "fmt"
+ "runtime"
+ "slices"
+ "sync"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/io"
+)
+
+type keyDefaultMap[K comparable, V any] struct {
+ defaultFactory func(K) V
+ data map[K]V
+
+ mx sync.RWMutex
+}
+
+func (k *keyDefaultMap[K, V]) Get(key K) V {
+ k.mx.RLock()
+ if v, ok := k.data[key]; ok {
+ k.mx.RUnlock()
+ return v
+ }
+
+ k.mx.RUnlock()
+ k.mx.Lock()
+ defer k.mx.Unlock()
+
+ v := k.defaultFactory(key)
+ k.data[key] = v
+ return v
+}
+
+func newKeyDefaultMap[K comparable, V any](factory func(K) V) keyDefaultMap[K,
V] {
+ return keyDefaultMap[K, V]{
+ data: make(map[K]V),
+ defaultFactory: factory,
+ }
+}
+
+func newKeyDefaultMapWrapErr[K comparable, V any](factory func(K) (V, error))
keyDefaultMap[K, V] {
+ return keyDefaultMap[K, V]{
+ data: make(map[K]V),
+ defaultFactory: func(k K) V {
+ v, err := factory(k)
+ if err != nil {
+ panic(err)
+ }
+ return v
+ },
+ }
+}
+
+type partitionRecord []any
+
+func (p partitionRecord) Size() int { return len(p) }
+func (p partitionRecord) Get(pos int) any { return p[pos] }
+func (p partitionRecord) Set(pos int, val any) { p[pos] = val }
+
+func getPartitionRecord(dataFile iceberg.DataFile, partitionType
*iceberg.StructType) partitionRecord {
+ partitionData := dataFile.Partition()
+
+ out := make(partitionRecord, len(partitionType.FieldList))
+ for i, f := range partitionType.FieldList {
+ out[i] = partitionData[f.Name]
+ }
+ return out
+}
+
+func openManifest(io io.IO, manifest iceberg.ManifestFile,
+ partitionFilter, metricsEval func(iceberg.DataFile) (bool, error))
([]iceberg.ManifestEntry, error) {
+
+ entries, err := manifest.FetchEntries(io, true)
+ if err != nil {
+ return nil, err
+ }
+
+ out := make([]iceberg.ManifestEntry, 0, len(entries))
+ for _, entry := range entries {
+ p, err := partitionFilter(entry.DataFile())
+ if err != nil {
+ return nil, err
+ }
+
+ m, err := metricsEval(entry.DataFile())
+ if err != nil {
+ return nil, err
+ }
+
+ if p && m {
+ out = append(out, entry)
+ }
+ }
+
+ return out, nil
+}
+
+type Scan struct {
+ metadata Metadata
+ io io.IO
+ rowFilter iceberg.BooleanExpression
+ selectedFields []string
+ caseSensitive bool
+ snapshotID *int64
+ options iceberg.Properties
+
+ partitionFilters keyDefaultMap[int, iceberg.BooleanExpression]
+}
+
+func (s *Scan) UseRef(name string) (*Scan, error) {
+ if s.snapshotID != nil {
+ return nil, fmt.Errorf("%w: cannot override ref, already set
snapshot id %d",
+ iceberg.ErrInvalidArgument, *s.snapshotID)
+ }
+
+ if snap := s.metadata.SnapshotByName(name); snap != nil {
+ out := &Scan{
+ metadata: s.metadata,
+ io: s.io,
+ rowFilter: s.rowFilter,
+ selectedFields: s.selectedFields,
+ caseSensitive: s.caseSensitive,
+ snapshotID: &snap.SnapshotID,
+ options: s.options,
+ }
+ out.partitionFilters =
newKeyDefaultMapWrapErr(out.buildPartitionProjection)
+
+ return out, nil
+ }
+
+ return nil, fmt.Errorf("%w: cannot scan unknown ref=%s",
iceberg.ErrInvalidArgument, name)
+}
+
+func (s *Scan) Snapshot() *Snapshot {
+ if s.snapshotID != nil {
+ return s.metadata.SnapshotByID(*s.snapshotID)
+ }
+ return s.metadata.CurrentSnapshot()
+}
+
+func (s *Scan) Projection() (*iceberg.Schema, error) {
+ curSchema := s.metadata.CurrentSchema()
+ if s.snapshotID != nil {
+ snap := s.metadata.SnapshotByID(*s.snapshotID)
+ if snap == nil {
+ return nil, fmt.Errorf("%w: snapshot not found: %d",
ErrInvalidOperation, *s.snapshotID)
+ }
+
+ if snap.SchemaID != nil {
+ for _, schema := range s.metadata.Schemas() {
+ if schema.ID == *snap.SchemaID {
+ curSchema = schema
+ break
+ }
+ }
+ }
+ }
+
+ if slices.Contains(s.selectedFields, "*") {
+ return curSchema, nil
+ }
+
+ return curSchema.Select(s.caseSensitive, s.selectedFields...)
+}
+
+func (s *Scan) buildPartitionProjection(specID int)
(iceberg.BooleanExpression, error) {
+ project := newInclusiveProjection(s.metadata.CurrentSchema(),
+ s.metadata.PartitionSpecs()[specID], true)
+ return project(s.rowFilter)
+}
+
+func (s *Scan) buildManifestEvaluator(specID int) (func(iceberg.ManifestFile)
(bool, error), error) {
+ spec := s.metadata.PartitionSpecs()[specID]
+ return newManifestEvaluator(spec, s.metadata.CurrentSchema(),
+ s.partitionFilters.Get(specID), s.caseSensitive)
+}
+
+func (s *Scan) buildPartitionEvaluator(specID int) func(iceberg.DataFile)
(bool, error) {
+ spec := s.metadata.PartitionSpecs()[specID]
+ partType := spec.PartitionType(s.metadata.CurrentSchema())
+ partSchema := iceberg.NewSchema(0, partType.FieldList...)
+ partExpr := s.partitionFilters.Get(specID)
+
+ return func(d iceberg.DataFile) (bool, error) {
+ fn, err := iceberg.ExpressionEvaluator(partSchema, partExpr,
s.caseSensitive)
+ if err != nil {
+ return false, err
+ }
+
+ return fn(getPartitionRecord(d, partType))
+ }
+}
+
+func (s *Scan) checkSequenceNumber(minSeqNum int64, manifest
iceberg.ManifestFile) bool {
+ return manifest.ManifestContent() == iceberg.ManifestContentData ||
+ (manifest.ManifestContent() == iceberg.ManifestContentDeletes &&
+ manifest.SequenceNum() >= minSeqNum)
+}
+
+func minSequenceNum(manifests []iceberg.ManifestFile) int64 {
+ n := int64(0)
+ for _, m := range manifests {
+ if m.ManifestContent() == iceberg.ManifestContentData {
+ n = min(n, m.MinSequenceNum())
+ }
+ }
+ return n
+}
+
+func matchDeletesToData(entry iceberg.ManifestEntry, positionalDeletes
[]iceberg.ManifestEntry) ([]iceberg.DataFile, error) {
+ idx, _ := slices.BinarySearchFunc(positionalDeletes, entry, func(me1,
me2 iceberg.ManifestEntry) int {
+ return cmp.Compare(me1.SequenceNum(), me2.SequenceNum())
+ })
+
+ evaluator, err :=
newInclusiveMetricsEvaluator(iceberg.PositionalDeleteSchema,
+ iceberg.EqualTo(iceberg.Reference("file_path"),
entry.DataFile().FilePath()), true, false)
+ if err != nil {
+ return nil, err
+ }
+
+ out := make([]iceberg.DataFile, 0)
+ for _, relevant := range positionalDeletes[idx:] {
+ df := relevant.DataFile()
+ ok, err := evaluator(df)
+ if err != nil {
+ return nil, err
+ }
+ if ok {
+ out = append(out, df)
+ }
+ }
+
+ return out, nil
+}
+
+func (s *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
+ snap := s.Snapshot()
+ if snap == nil {
+ return nil, nil
+ }
+
+ // step 1: filter manifests using partition summaries
+ // the filter depends on the partition spec used to write the manifest
file
+ // so create a cache of filters for each spec id
+ manifestEvaluators := newKeyDefaultMapWrapErr(s.buildManifestEvaluator)
+ manifestList, err := snap.Manifests(s.io)
+ if err != nil {
+ return nil, err
+ }
+
+ // remove any manifests that we don't need to use
+ manifestList = slices.DeleteFunc(manifestList, func(mf
iceberg.ManifestFile) bool {
+ eval := manifestEvaluators.Get(int(mf.PartitionSpecID()))
+ use, err := eval(mf)
+ return !use || err != nil
+ })
+
+ // step 2: filter the data files in each manifest
+ // this filter depends on the partition spec used to write the manifest
file
+ partitionEvaluators := newKeyDefaultMap(s.buildPartitionEvaluator)
+ metricsEval, err := newInclusiveMetricsEvaluator(
+ s.metadata.CurrentSchema(), s.rowFilter, s.caseSensitive,
s.options["include_empty_files"] == "true")
+ if err != nil {
+ return nil, err
+ }
+
+ minSeqNum := minSequenceNum(manifestList)
+ dataEntries := make([]iceberg.ManifestEntry, 0)
+ positionalDeleteEntries := make([]iceberg.ManifestEntry, 0)
+
+ nworkers := runtime.NumCPU()
+ var wg sync.WaitGroup
+
+ manifestChan := make(chan iceberg.ManifestFile, len(manifestList))
+ entryChan := make(chan []iceberg.ManifestEntry, 20)
+
+ ctx, cancel := context.WithCancelCause(ctx)
+ for i := 0; i < nworkers; i++ {
+ wg.Add(1)
+
+ go func() {
+ defer wg.Done()
+
+ for {
+ select {
+ case m, ok := <-manifestChan:
+ if !ok {
+ return
+ }
+
+ if !s.checkSequenceNumber(minSeqNum, m)
{
+ continue
+ }
+
+ entries, err := openManifest(s.io, m,
+
partitionEvaluators.Get(int(m.PartitionSpecID())), metricsEval)
+ if err != nil {
+ cancel(err)
+ break
+ }
+
+ entryChan <- entries
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+ }
+
+ go func() {
+ wg.Wait()
+ close(entryChan)
+ }()
+
+ for _, m := range manifestList {
+ manifestChan <- m
+ }
+ close(manifestChan)
+
+Loop:
+ for {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case entries, ok := <-entryChan:
+ if !ok {
+ // closed!
+ break Loop
+ }
+
+ for _, e := range entries {
+ df := e.DataFile()
+ switch df.ContentType() {
+ case iceberg.EntryContentData:
+ dataEntries = append(dataEntries, e)
+ case iceberg.EntryContentPosDeletes:
+ positionalDeleteEntries =
append(positionalDeleteEntries, e)
+ case iceberg.EntryContentEqDeletes:
+ return nil, fmt.Errorf("iceberg-go does
not yet support equality deletes")
+ default:
+ return nil, fmt.Errorf("%w: unknown
DataFileContent type (%s): %s",
+ ErrInvalidMetadata,
df.ContentType(), e)
+ }
+ }
+ }
+ }
+
+ slices.SortFunc(positionalDeleteEntries, func(a, b
iceberg.ManifestEntry) int {
+ return cmp.Compare(a.SequenceNum(), b.SequenceNum())
+ })
+
+ results := make([]FileScanTask, 0)
+ for _, e := range dataEntries {
+ deleteFiles, err := matchDeletesToData(e,
positionalDeleteEntries)
+ if err != nil {
+ return nil, err
+ }
+
+ results = append(results, FileScanTask{
+ File: e.DataFile(),
+ DeleteFiles: deleteFiles,
+ Start: 0,
+ Length: e.DataFile().FileSizeBytes(),
+ })
+ }
+
+ return results, nil
+}
+
+type FileScanTask struct {
+ File iceberg.DataFile
+ DeleteFiles []iceberg.DataFile
+ Start, Length int64
+}
diff --git a/table/scanner_test.go b/table/scanner_test.go
new file mode 100644
index 0000000..a35d2dd
--- /dev/null
+++ b/table/scanner_test.go
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build integration
+
+package table_test
+
+import (
+ "context"
+ "testing"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/catalog"
+ "github.com/apache/iceberg-go/io"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestScanner(t *testing.T) {
+ cat, err := catalog.NewRestCatalog("rest", "http://localhost:8181")
+ require.NoError(t, err)
+
+ props := iceberg.Properties{
+ io.S3Region: "us-east-1",
+ io.S3AccessKeyID: "admin", io.S3SecretAccessKey: "password"}
+
+ tests := []struct {
+ table string
+ expr iceberg.BooleanExpression
+ expectedNumTasks int
+ }{
+ {"test_all_types", iceberg.AlwaysTrue{}, 5},
+ {"test_all_types",
iceberg.LessThan(iceberg.Reference("intCol"), int32(3)), 3},
+ {"test_all_types",
iceberg.GreaterThanEqual(iceberg.Reference("intCol"), int32(3)), 2},
+ {"test_partitioned_by_identity",
+ iceberg.GreaterThanEqual(iceberg.Reference("ts"),
"2023-03-05T00:00:00+00:00"), 8},
+ {"test_partitioned_by_identity",
+ iceberg.LessThan(iceberg.Reference("ts"),
"2023-03-05T00:00:00+00:00"), 4},
+ {"test_partitioned_by_years", iceberg.AlwaysTrue{}, 2},
+ {"test_partitioned_by_years",
iceberg.LessThan(iceberg.Reference("dt"), "2023-03-05"), 1},
+ {"test_partitioned_by_years",
iceberg.GreaterThanEqual(iceberg.Reference("dt"), "2023-03-05"), 1},
+ {"test_partitioned_by_months",
iceberg.GreaterThanEqual(iceberg.Reference("dt"), "2023-03-05"), 1},
+ {"test_partitioned_by_days",
iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00"),
8},
+ {"test_partitioned_by_hours",
iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00"),
8},
+ {"test_partitioned_by_truncate",
iceberg.GreaterThanEqual(iceberg.Reference("letter"), "e"), 8},
+ {"test_partitioned_by_bucket",
iceberg.GreaterThanEqual(iceberg.Reference("number"), int32(5)), 6},
+ // for some reason when I run the provisioning locally i get 5
data files
+ // but GHA CI running spark provisioning ends up with only 4
files?
+ // anyone know why?
+ {"test_uuid_and_fixed_unpartitioned", iceberg.AlwaysTrue{}, 4},
+ {"test_uuid_and_fixed_unpartitioned",
iceberg.EqualTo(iceberg.Reference("uuid_col"),
"102cb62f-e6f8-4eb0-9973-d9b012ff0967"), 1},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.table+" "+tt.expr.String(), func(t *testing.T) {
+ ident := catalog.ToRestIdentifier("default", tt.table)
+
+ tbl, err := cat.LoadTable(context.Background(), ident,
props)
+ require.NoError(t, err)
+
+ scan := tbl.Scan(tt.expr, 0, true, "*")
+ tasks, err := scan.PlanFiles(context.Background())
+ require.NoError(t, err)
+
+ assert.Len(t, tasks, tt.expectedNumTasks)
+ })
+ }
+}
+
+func TestScannerWithDeletes(t *testing.T) {
+ cat, err := catalog.NewRestCatalog("rest", "http://localhost:8181")
+ require.NoError(t, err)
+
+ props := iceberg.Properties{
+ io.S3Region: "us-east-1",
+ io.S3AccessKeyID: "admin", io.S3SecretAccessKey: "password"}
+
+ ident := catalog.ToRestIdentifier("default",
"test_positional_mor_deletes")
+
+ tbl, err := cat.LoadTable(context.Background(), ident, props)
+ require.NoError(t, err)
+
+ scan := tbl.Scan(iceberg.AlwaysTrue{}, 0, true, "*")
+ tasks, err := scan.PlanFiles(context.Background())
+ require.NoError(t, err)
+
+ assert.Len(t, tasks, 1)
+ assert.Len(t, tasks[0].DeleteFiles, 1)
+
+ tagScan, err := scan.UseRef("tag_12")
+ require.NoError(t, err)
+
+ tasks, err = tagScan.PlanFiles(context.Background())
+ require.NoError(t, err)
+
+ assert.Len(t, tasks, 1)
+ assert.Len(t, tasks[0].DeleteFiles, 0)
+
+ _, err = tagScan.UseRef("without_5")
+ assert.ErrorIs(t, err, iceberg.ErrInvalidArgument)
+
+ tagScan, err = scan.UseRef("without_5")
+ require.NoError(t, err)
+
+ tasks, err = tagScan.PlanFiles(context.Background())
+ require.NoError(t, err)
+
+ assert.Len(t, tasks, 1)
+ assert.Len(t, tasks[0].DeleteFiles, 1)
+}
diff --git a/table/table.go b/table/table.go
index ff370ec..b4cf92c 100644
--- a/table/table.go
+++ b/table/table.go
@@ -59,6 +59,23 @@ func (t Table) Schemas() map[int]*iceberg.Schema {
return m
}
+func (t Table) Scan(rowFilter iceberg.BooleanExpression, snapshotID int64,
caseSensitive bool, fields ...string) *Scan {
+ s := &Scan{
+ metadata: t.metadata,
+ io: t.fs,
+ rowFilter: rowFilter,
+ selectedFields: fields,
+ caseSensitive: caseSensitive,
+ }
+
+ if snapshotID != 0 {
+ s.snapshotID = &snapshotID
+ }
+
+ s.partitionFilters = newKeyDefaultMapWrapErr(s.buildPartitionProjection)
+ return s
+}
+
func New(ident Identifier, meta Metadata, location string, fs io.IO) *Table {
return &Table{
identifier: ident,
diff --git a/transforms.go b/transforms.go
index 81e85b0..887d46b 100644
--- a/transforms.go
+++ b/transforms.go
@@ -19,10 +19,18 @@ package iceberg
import (
"encoding"
+ "encoding/binary"
"fmt"
+ "math"
+ "math/big"
"strconv"
"strings"
"time"
+ "unsafe"
+
+ "github.com/apache/arrow/go/v16/arrow/decimal128"
+ "github.com/google/uuid"
+ "github.com/twmb/murmur3"
)
// ParseTransform takes the string representation of a transform as
@@ -75,6 +83,9 @@ type Transform interface {
fmt.Stringer
encoding.TextMarshaler
ResultType(t Type) Type
+ Equals(Transform) bool
+ Apply(Optional[Literal]) Optional[Literal]
+ Project(name string, pred BoundPredicate) (UnboundPredicate, error)
}
// IdentityTransform uses the identity function, performing no transformation
@@ -89,6 +100,32 @@ func (IdentityTransform) String() string { return
"identity" }
func (IdentityTransform) ResultType(t Type) Type { return t }
+func (IdentityTransform) Equals(other Transform) bool {
+ _, ok := other.(IdentityTransform)
+ return ok
+}
+
+func (IdentityTransform) Apply(value Optional[Literal]) Optional[Literal] {
+ return value
+}
+
+func (t IdentityTransform) Project(name string, pred BoundPredicate)
(UnboundPredicate, error) {
+ if _, ok := pred.Term().(*BoundTransform); ok {
+ return projectTransformPredicate(t, name, pred)
+ }
+
+ switch p := pred.(type) {
+ case BoundUnaryPredicate:
+ return p.AsUnbound(Reference(name)), nil
+ case BoundLiteralPredicate:
+ return p.AsUnbound(Reference(name), p.Literal()), nil
+ case BoundSetPredicate:
+ return p.AsUnbound(Reference(name), p.Literals().Members()), nil
+ }
+
+ return nil, nil
+}
+
// VoidTransform is a transformation that always returns nil.
type VoidTransform struct{}
@@ -100,6 +137,19 @@ func (VoidTransform) String() string { return "void" }
func (VoidTransform) ResultType(t Type) Type { return t }
+func (VoidTransform) Equals(other Transform) bool {
+ _, ok := other.(VoidTransform)
+ return ok
+}
+
+func (VoidTransform) Apply(value Optional[Literal]) Optional[Literal] {
+ return Optional[Literal]{}
+}
+
+func (VoidTransform) Project(string, BoundPredicate) (UnboundPredicate, error)
{
+ return nil, nil
+}
+
// BucketTransform transforms values into a bucket partition value. It is
// parameterized by a number of buckets. Bucket partition transforms use
// a 32-bit hash of the source value to produce a positive value by mod
@@ -116,6 +166,138 @@ func (t BucketTransform) String() string { return
fmt.Sprintf("bucket[%d]", t.Nu
func (BucketTransform) ResultType(Type) Type { return PrimitiveTypes.Int32 }
+func hashHelperInt[T ~int32 | ~int64](v any) uint32 {
+ var (
+ val = uint64(v.(T))
+ buf [8]byte
+ b = buf[:]
+ )
+
+ binary.LittleEndian.PutUint64(b, val)
+ return murmur3.Sum32(b)
+}
+
+func (t BucketTransform) Equals(other Transform) bool {
+ rhs, ok := other.(BucketTransform)
+ if !ok {
+ return false
+ }
+
+ return t.NumBuckets == rhs.NumBuckets
+}
+
+func (t BucketTransform) Apply(value Optional[Literal]) Optional[Literal] {
+ if !value.Valid {
+ return Optional[Literal]{}
+ }
+
+ var hash uint32
+ switch v := value.Val.(type) {
+ case TypedLiteral[[]byte]:
+ hash = murmur3.Sum32(v.Value())
+ case StringLiteral:
+ hash = murmur3.Sum32(unsafe.Slice(unsafe.StringData(string(v)),
len(v)))
+ case UUIDLiteral:
+ hash = murmur3.Sum32(v[:])
+ case DecimalLiteral:
+ b, _ := v.MarshalBinary()
+ hash = murmur3.Sum32(b)
+ case Int32Literal:
+ hash = hashHelperInt[int64](int64(v))
+ case Int64Literal:
+ hash = hashHelperInt[int64](int64(v))
+ case DateLiteral:
+ hash = hashHelperInt[int64](int64(v))
+ case TimeLiteral:
+ hash = hashHelperInt[int64](int64(v))
+ case TimestampLiteral:
+ hash = hashHelperInt[int64](int64(v))
+ default:
+ return Optional[Literal]{}
+ }
+
+ return Optional[Literal]{
+ Valid: true,
+ Val: Int32Literal((int32(hash) & math.MaxInt32) %
int32(t.NumBuckets))}
+}
+
+func (t BucketTransform) Transformer(src Type) func(any) Optional[int32] {
+ var h func(any) uint32
+
+ switch src.(type) {
+ case Int32Type:
+ h = hashHelperInt[int32]
+ case DateType:
+ h = hashHelperInt[Date]
+ case Int64Type:
+ h = hashHelperInt[int64]
+ case TimeType:
+ h = hashHelperInt[Time]
+ case TimestampType:
+ h = hashHelperInt[Timestamp]
+ case TimestampTzType:
+ h = hashHelperInt[Timestamp]
+ case DecimalType:
+ h = func(v any) uint32 {
+ b, _ := DecimalLiteral(v.(Decimal)).MarshalBinary()
+ return murmur3.Sum32(b)
+ }
+ case StringType, FixedType, BinaryType:
+ h = func(v any) uint32 {
+ if v, ok := v.([]byte); ok {
+ return murmur3.Sum32(v)
+ }
+
+ str := v.(string)
+ return
murmur3.Sum32(unsafe.Slice(unsafe.StringData(str), len(str)))
+ }
+ case UUIDType:
+ h = func(v any) uint32 {
+ if v, ok := v.([]byte); ok {
+ return murmur3.Sum32(v)
+ }
+
+ u := v.(uuid.UUID)
+ return murmur3.Sum32(u[:])
+ }
+ }
+
+ return func(v any) Optional[int32] {
+ if v == nil {
+ return Optional[int32]{}
+ }
+
+ return Optional[int32]{
+ Valid: true,
+ Val: int32((int32(h(v)) & math.MaxInt32) %
int32(t.NumBuckets))}
+ }
+}
+
+func (t BucketTransform) Project(name string, pred BoundPredicate)
(UnboundPredicate, error) {
+ if _, ok := pred.Term().(*BoundTransform); ok {
+ return projectTransformPredicate(t, name, pred)
+ }
+
+ transformer := t.Transformer(pred.Term().Type())
+ switch p := pred.(type) {
+ case BoundUnaryPredicate:
+ return p.AsUnbound(Reference(name)), nil
+ case BoundLiteralPredicate:
+ if p.Op() != OpEQ {
+ break
+ }
+ return p.AsUnbound(Reference(name),
transformLiteral(transformer, p.Literal())), nil
+ case BoundSetPredicate:
+ if p.Op() != OpIn {
+ break
+ }
+
+ return setApplyTransform(name, p, transformer), nil
+ }
+
+ return nil, nil
+}
+
// TruncateTransform is a transformation for truncating a value to a specified
width.
type TruncateTransform struct {
Width int
@@ -129,8 +311,177 @@ func (t TruncateTransform) String() string { return
fmt.Sprintf("truncate[%d]",
func (TruncateTransform) ResultType(t Type) Type { return t }
+func (t TruncateTransform) Equals(other Transform) bool {
+ rhs, ok := other.(TruncateTransform)
+ if !ok {
+ return false
+ }
+
+ return t.Width == rhs.Width
+}
+
+func (t TruncateTransform) Transformer(src Type) (func(any) any, error) {
+ switch src.(type) {
+ case Int32Type:
+ return func(v any) any {
+ if v == nil {
+ return nil
+ }
+
+ val := v.(int32)
+ return val - (val % int32(t.Width))
+ }, nil
+ case Int64Type:
+ return func(v any) any {
+ if v == nil {
+ return nil
+ }
+
+ val := v.(int64)
+ return val - (val % int64(t.Width))
+ }, nil
+ case StringType, BinaryType:
+ return func(v any) any {
+ switch v := v.(type) {
+ case string:
+ return v[:min(len(v), t.Width)]
+ case []byte:
+ return v[:min(len(v), t.Width)]
+ default:
+ return nil
+ }
+ }, nil
+ case DecimalType:
+ bigWidth := big.NewInt(int64(t.Width))
+ return func(v any) any {
+ if v == nil {
+ return nil
+ }
+
+ val := v.(Decimal)
+ unscaled := val.Val.BigInt()
+ // unscaled - (((unscaled % width) + width) % width)
+ applied := (&big.Int{}).Mod(unscaled, bigWidth)
+ applied.Add(applied, bigWidth).Mod(applied, bigWidth)
+ val.Val = decimal128.FromBigInt(unscaled.Sub(unscaled,
applied))
+ return val
+ }, nil
+ }
+
+ return nil, fmt.Errorf("%w: cannot truncate for type %s",
+ ErrInvalidArgument, src)
+}
+
+func (t TruncateTransform) Apply(value Optional[Literal]) (out
Optional[Literal]) {
+ if !value.Valid {
+ return
+ }
+
+ fn, err := t.Transformer(value.Val.Type())
+ if err != nil {
+ return
+ }
+
+ out.Valid = true
+ switch v := value.Val.(type) {
+ case Int32Literal:
+ out.Val = Int32Literal(fn(int32(v)).(int32))
+ case Int64Literal:
+ out.Val = Int64Literal(fn(int64(v)).(int64))
+ case DecimalLiteral:
+ out.Val = DecimalLiteral(fn(Decimal(v)).(Decimal))
+ case StringLiteral:
+ out.Val = StringLiteral(fn(string(v)).(string))
+ case BinaryLiteral:
+ out.Val = BinaryLiteral(fn([]byte(v)).([]byte))
+ }
+
+ return
+}
+
+func (t TruncateTransform) Project(name string, pred BoundPredicate)
(UnboundPredicate, error) {
+ if _, ok := pred.Term().(*BoundTransform); ok {
+ return projectTransformPredicate(t, name, pred)
+ }
+
+ fieldType := pred.Term().Ref().Field().Type
+
+ transformer, err := t.Transformer(fieldType)
+ if err != nil {
+ return nil, err
+ }
+
+ switch p := pred.(type) {
+ case BoundUnaryPredicate:
+ return p.AsUnbound(Reference(name)), nil
+ case BoundSetPredicate:
+ if p.Op() != OpIn {
+ break
+ }
+
+ switch fieldType.(type) {
+ case Int32Type:
+ return setApplyTransform(name, p,
wrapTransformFn[int32](transformer)), nil
+ case Int64Type:
+ return setApplyTransform(name, p,
wrapTransformFn[int64](transformer)), nil
+ case DecimalType:
+ return setApplyTransform(name, p,
wrapTransformFn[Decimal](transformer)), nil
+ case StringType:
+ return setApplyTransform(name, p,
wrapTransformFn[string](transformer)), nil
+ case BinaryType:
+ return setApplyTransform(name, p,
wrapTransformFn[[]byte](transformer)), nil
+ }
+ case BoundLiteralPredicate:
+ switch fieldType.(type) {
+ case Int32Type:
+ return truncateNumber(name, p,
wrapTransformFn[int32](transformer))
+ case Int64Type:
+ return truncateNumber(name, p,
wrapTransformFn[int64](transformer))
+ case DecimalType:
+ return truncateNumber(name, p,
wrapTransformFn[Decimal](transformer))
+ case StringType:
+ return truncateArray(name, p,
wrapTransformFn[string](transformer))
+ case BinaryType:
+ return truncateArray(name, p,
wrapTransformFn[[]byte](transformer))
+ }
+ }
+
+ return nil, nil
+}
+
var epochTM = time.Unix(0, 0).UTC()
+type timeTransform interface {
+ Transform
+ Transformer(Type) (func(any) Optional[int32], error)
+}
+
+func projectTimeTransform(t timeTransform, name string, pred BoundPredicate)
(UnboundPredicate, error) {
+ if _, ok := pred.Term().(*BoundTransform); ok {
+ return projectTransformPredicate(t, name, pred)
+ }
+
+ transformer, err := t.Transformer(pred.Term().Ref().Type())
+ if err != nil {
+ return nil, err
+ }
+
+ switch p := pred.(type) {
+ case BoundUnaryPredicate:
+ return p.AsUnbound(Reference(name)), nil
+ case BoundLiteralPredicate:
+ return truncateNumber(name, p, transformer)
+ case BoundSetPredicate:
+ if p.Op() != OpIn {
+ break
+ }
+
+ return setApplyTransform(name, p, transformer), nil
+ }
+
+ return nil, nil
+}
+
// YearTransform transforms a datetime value into a year value.
type YearTransform struct{}
@@ -142,6 +493,62 @@ func (YearTransform) String() string { return "year" }
func (YearTransform) ResultType(Type) Type { return PrimitiveTypes.Int32 }
+func (YearTransform) Equals(other Transform) bool {
+ _, ok := other.(YearTransform)
+ return ok
+}
+
+func (YearTransform) Transformer(src Type) (func(any) Optional[int32], error) {
+ switch src.(type) {
+ case DateType:
+ return func(v any) Optional[int32] {
+ if v == nil {
+ return Optional[int32]{}
+ }
+
+ return Optional[int32]{
+ Valid: true,
+ Val: int32(v.(Date).ToTime().Year() -
epochTM.Year()),
+ }
+ }, nil
+ case TimestampType, TimestampTzType:
+ return func(v any) Optional[int32] {
+ if v == nil {
+ return Optional[int32]{}
+ }
+
+ return Optional[int32]{
+ Valid: true,
+ Val: int32(v.(Timestamp).ToTime().Year() -
epochTM.Year()),
+ }
+ }, nil
+ }
+
+ return nil, fmt.Errorf("%w: cannot apply year transform for type %s",
+ ErrInvalidArgument, src)
+}
+
+func (YearTransform) Apply(value Optional[Literal]) (out Optional[Literal]) {
+ if !value.Valid {
+ return
+ }
+
+ switch v := value.Val.(type) {
+ case DateLiteral:
+ out.Valid = true
+ out.Val = Int32Literal(Date(v).ToTime().Year() - epochTM.Year())
+ case TimestampLiteral:
+ out.Valid = true
+ out.Val = Int32Literal(Timestamp(v).ToTime().Year() -
epochTM.Year())
+ }
+
+ return
+}
+
+func (t YearTransform) Project(name string, pred BoundPredicate)
(UnboundPredicate, error) {
+ return projectTimeTransform(t, name, pred)
+}
+
// MonthTransform transforms a datetime value into a month value.
type MonthTransform struct{}
@@ -153,6 +560,70 @@ func (MonthTransform) String() string { return "month" }
func (MonthTransform) ResultType(Type) Type { return PrimitiveTypes.Int32 }
+func (MonthTransform) Equals(other Transform) bool {
+ _, ok := other.(MonthTransform)
+ return ok
+}
+
+func (MonthTransform) Transformer(src Type) (func(any) Optional[int32], error)
{
+ switch src.(type) {
+ case DateType:
+ return func(v any) Optional[int32] {
+ if v == nil {
+ return Optional[int32]{}
+ }
+
+ d := v.(Date).ToTime()
+ return Optional[int32]{
+ Valid: true,
+ Val: int32((d.Year()-epochTM.Year())*12 +
(int(d.Month()) - int(epochTM.Month()))),
+ }
+
+ }, nil
+ case TimestampType, TimestampTzType:
+ return func(v any) Optional[int32] {
+ if v == nil {
+ return Optional[int32]{}
+ }
+
+ d := v.(Timestamp).ToTime()
+ return Optional[int32]{
+ Valid: true,
+ Val: int32((d.Year()-epochTM.Year())*12 +
(int(d.Month()) - int(epochTM.Month()))),
+ }
+
+ }, nil
+
+ }
+
+ return nil, fmt.Errorf("%w: cannot apply month transform for type %s",
+ ErrInvalidArgument, src)
+}
+
+func (MonthTransform) Apply(value Optional[Literal]) (out Optional[Literal]) {
+ if !value.Valid {
+ return
+ }
+
+ var tm time.Time
+ switch v := value.Val.(type) {
+ case DateLiteral:
+ tm = Date(v).ToTime()
+ case TimestampLiteral:
+ tm = Timestamp(v).ToTime()
+ default:
+ return
+ }
+
+ out.Valid = true
+ out.Val = Int32Literal(int32((tm.Year()-epochTM.Year())*12 +
(int(tm.Month()) - int(epochTM.Month()))))
+ return
+}
+
+func (t MonthTransform) Project(name string, pred BoundPredicate)
(UnboundPredicate, error) {
+ return projectTimeTransform(t, name, pred)
+}
+
// DayTransform transforms a datetime value into a date value.
type DayTransform struct{}
@@ -164,6 +635,59 @@ func (DayTransform) String() string { return "day" }
func (DayTransform) ResultType(Type) Type { return PrimitiveTypes.Date }
+func (DayTransform) Equals(other Transform) bool {
+ _, ok := other.(DayTransform)
+ return ok
+}
+
+func (DayTransform) Transformer(src Type) (func(any) Optional[int32], error) {
+ switch src.(type) {
+ case DateType:
+ return func(v any) Optional[int32] {
+ if v == nil {
+ return Optional[int32]{}
+ }
+
+ return Optional[int32]{
+ Valid: true,
+ Val: int32(v.(Date)),
+ }
+ }, nil
+ case TimestampType, TimestampTzType:
+ return func(v any) Optional[int32] {
+ if v == nil {
+ return Optional[int32]{}
+ }
+
+ return Optional[int32]{
+ Valid: true,
+ Val: int32(v.(Timestamp).ToDate()),
+ }
+ }, nil
+ }
+
+ return nil, fmt.Errorf("%w: cannot apply day transform for type %s",
+ ErrInvalidArgument, src)
+}
+
+func (DayTransform) Apply(value Optional[Literal]) (out Optional[Literal]) {
+ if !value.Valid {
+ return
+ }
+
+ switch v := value.Val.(type) {
+ case DateLiteral:
+ out.Valid, out.Val = true, Int32Literal(v)
+ case TimestampLiteral:
+ out.Valid, out.Val = true, Int32Literal(Timestamp(v).ToDate())
+ }
+ return
+}
+
+func (t DayTransform) Project(name string, pred BoundPredicate)
(UnboundPredicate, error) {
+ return projectTimeTransform(t, name, pred)
+}
+
// HourTransform transforms a datetime value into an hour value.
type HourTransform struct{}
@@ -174,3 +698,174 @@ func (t HourTransform) MarshalText() ([]byte, error) {
func (HourTransform) String() string { return "hour" }
func (HourTransform) ResultType(Type) Type { return PrimitiveTypes.Int32 }
+
+func (HourTransform) Equals(other Transform) bool {
+ _, ok := other.(HourTransform)
+ return ok
+}
+
+func (HourTransform) Transformer(src Type) (func(any) Optional[int32], error) {
+ switch src.(type) {
+ case TimestampType, TimestampTzType:
+ const factor = int64(time.Hour / time.Microsecond)
+ return func(v any) Optional[int32] {
+ if v == nil {
+ return Optional[int32]{}
+ }
+
+ return Optional[int32]{
+ Valid: true,
+ Val: int32(int64(v.(Timestamp)) / factor),
+ }
+ }, nil
+ }
+
+ return nil, fmt.Errorf("%w: cannot apply hour transform for type %s",
+ ErrInvalidArgument, src)
+}
+
+func (HourTransform) Apply(value Optional[Literal]) (out Optional[Literal]) {
+ if !value.Valid {
+ return
+ }
+
+ switch v := value.Val.(type) {
+ case TimestampLiteral:
+ const factor = int64(time.Hour / time.Microsecond)
+ out.Valid, out.Val = true, Int32Literal(int32(int64(v)/factor))
+ }
+
+ return
+}
+
+func (t HourTransform) Project(name string, pred BoundPredicate)
(UnboundPredicate, error) {
+ return projectTimeTransform(t, name, pred)
+}
+
+func removeTransform(partName string, pred BoundPredicate) (UnboundPredicate,
error) {
+ switch p := pred.(type) {
+ case BoundUnaryPredicate:
+ return p.AsUnbound(Reference(partName)), nil
+ case BoundLiteralPredicate:
+ return p.AsUnbound(Reference(partName), p.Literal()), nil
+ case BoundSetPredicate:
+ return p.AsUnbound(Reference(partName),
p.Literals().Members()), nil
+ }
+
+ return nil, fmt.Errorf("%w: cannot replace transform in unknown
predicate: %s",
+ ErrInvalidArgument, pred)
+}
+
+func projectTransformPredicate(t Transform, partitionName string, pred
BoundPredicate) (UnboundPredicate, error) {
+ term := pred.Term()
+ bt, ok := term.(*BoundTransform)
+ if !ok || !t.Equals(bt.transform) {
+ return nil, nil
+ }
+
+ return removeTransform(partitionName, pred)
+}
+
+func transformLiteral[T LiteralType](fn func(any) Optional[T], lit Literal)
Literal {
+ switch l := lit.(type) {
+ case BoolLiteral:
+ return NewLiteral(fn(bool(l)).Val)
+ case Int32Literal:
+ return NewLiteral(fn(int32(l)).Val)
+ case Int64Literal:
+ return NewLiteral(fn(int64(l)).Val)
+ case Float32Literal:
+ return NewLiteral(fn(float32(l)).Val)
+ case Float64Literal:
+ return NewLiteral(fn(float64(l)).Val)
+ case DateLiteral:
+ return NewLiteral(fn(Date(l)).Val)
+ case TimeLiteral:
+ return NewLiteral(fn(Time(l)).Val)
+ case TimestampLiteral:
+ return NewLiteral(fn(Timestamp(l)).Val)
+ case StringLiteral:
+ return NewLiteral(fn(string(l)).Val)
+ case FixedLiteral:
+ return NewLiteral(fn([]byte(l)).Val)
+ case BinaryLiteral:
+ return NewLiteral(fn([]byte(l)).Val)
+ case UUIDLiteral:
+ return NewLiteral(fn(uuid.UUID(l)).Val)
+ case DecimalLiteral:
+ return NewLiteral(fn(Decimal(l)).Val)
+ }
+
+ panic("invalid literal type")
+}
+
+func wrapTransformFn[T LiteralType](fn func(any) any) func(any) Optional[T] {
+ return func(v any) Optional[T] {
+ out := fn(v)
+ if out == nil {
+ return Optional[T]{}
+ }
+ return Optional[T]{Valid: true, Val: out.(T)}
+ }
+}
+
+func truncateNumber[T LiteralType](name string, pred BoundLiteralPredicate, fn
func(any) Optional[T]) (UnboundPredicate, error) {
+ boundary, ok := pred.Literal().(NumericLiteral)
+ if !ok {
+ return nil, fmt.Errorf("%w: expected numeric literal, got %s",
+ ErrInvalidArgument, boundary.Type())
+ }
+
+ switch pred.Op() {
+ case OpLT:
+ return LiteralPredicate(OpLTEQ, Reference(name),
+ transformLiteral(fn, boundary.Decrement())), nil
+ case OpLTEQ:
+ return LiteralPredicate(OpLTEQ, Reference(name),
+ transformLiteral(fn, boundary)), nil
+ case OpGT:
+ return LiteralPredicate(OpGTEQ, Reference(name),
+ transformLiteral(fn, boundary.Increment())), nil
+ case OpGTEQ:
+ return LiteralPredicate(OpGTEQ, Reference(name),
+ transformLiteral(fn, boundary)), nil
+ case OpEQ:
+ return LiteralPredicate(OpEQ, Reference(name),
+ transformLiteral(fn, boundary)), nil
+ }
+
+ return nil, nil
+}
+
+func truncateArray[T LiteralType](name string, pred BoundLiteralPredicate, fn
func(any) Optional[T]) (UnboundPredicate, error) {
+ boundary := pred.Literal()
+
+ switch pred.Op() {
+ case OpLT, OpLTEQ:
+ return LiteralPredicate(OpLTEQ, Reference(name),
+ transformLiteral(fn, boundary)), nil
+ case OpGT, OpGTEQ:
+ return LiteralPredicate(OpGTEQ, Reference(name),
+ transformLiteral(fn, boundary)), nil
+ case OpEQ:
+ return LiteralPredicate(OpEQ, Reference(name),
+ transformLiteral(fn, boundary)), nil
+ case OpStartsWith:
+ return LiteralPredicate(OpStartsWith, Reference(name),
+ transformLiteral(fn, boundary)), nil
+ case OpNotStartsWith:
+ return LiteralPredicate(OpNotStartsWith, Reference(name),
+ transformLiteral(fn, boundary)), nil
+ }
+
+ return nil, nil
+}
+
+func setApplyTransform[T LiteralType](name string, pred BoundSetPredicate, fn
func(any) Optional[T]) UnboundPredicate {
+ lits := pred.Literals().Members()
+ for i, l := range lits {
+ lits[i] = transformLiteral(fn, l)
+ }
+
+ return pred.AsUnbound(Reference(name), lits)
+}