This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e1ba00  feat(table/scanner): Initial pass for planning a scan and 
returning the files to use (#118)
6e1ba00 is described below

commit 6e1ba00940e03992f5546dd0ca110a8096866c21
Author: Matt Topol <[email protected]>
AuthorDate: Fri Aug 23 09:13:10 2024 -0400

    feat(table/scanner): Initial pass for planning a scan and returning the 
files to use (#118)
---
 .github/workflows/go-ci.yml          |    4 +-
 .github/workflows/go-integration.yml |   71 ++
 dev/Dockerfile                       |   25 +
 dev/docker-compose.yml               |    8 +-
 dev/provision.py                     |  379 ++++++++++
 exprs.go                             |   44 +-
 go.mod                               |    1 +
 go.sum                               |    2 +
 io/s3.go                             |    8 +-
 manifest.go                          |    2 -
 table/evaluators.go                  |  635 +++++++++++++++++
 table/evaluators_test.go             | 1294 ++++++++++++++++++++++++++++++++++
 table/scanner.go                     |  395 +++++++++++
 table/scanner_test.go                |  124 ++++
 table/table.go                       |   17 +
 transforms.go                        |  695 ++++++++++++++++++
 16 files changed, 3694 insertions(+), 10 deletions(-)

diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml
index a6014fd..e82a3ed 100644
--- a/.github/workflows/go-ci.yml
+++ b/.github/workflows/go-ci.yml
@@ -20,7 +20,7 @@ name: Go
 on:
   push:
     branches:
-      - 'master'
+      - 'main'
     tags:
       - 'v**'          
   pull_request:    
@@ -39,7 +39,7 @@ jobs:
     strategy:
       fail-fast: false
       matrix:
-        go: [ '1.21' ]
+        go: [ '1.21', '1.22' ]
         os: [ 'ubuntu-latest', 'windows-latest', 'macos-latest' ]
     steps:
     - uses: actions/checkout@v3
diff --git a/.github/workflows/go-integration.yml 
b/.github/workflows/go-integration.yml
new file mode 100644
index 0000000..b736ef3
--- /dev/null
+++ b/.github/workflows/go-integration.yml
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "Go Integration"
+
+on:
+  push:
+    branches:
+      - 'main'
+    tags:
+      - 'v**'
+  pull_request:
+
+concurrency:
+  group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ 
github.workflow }}
+  cancel-in-progress: ${{ github.event_name == 'pull_request' }}
+
+permissions:
+  contents: read
+
+jobs:
+  integration-test:
+    runs-on: ubuntu-latest
+    steps:
+      - uses: actions/checkout@v4
+        with:
+          fetch-depth: 2
+      - name: Install Go
+        uses: actions/setup-go@v4
+        with:
+          go-version: 1.22
+          cache: true
+          cache-dependency-path: go.sum
+
+      - name: Start docker
+        run: |
+          docker compose -f dev/docker-compose.yml up -d
+          sleep 10
+      - name: Provision Tables
+        run: |
+          docker compose -f dev/docker-compose.yml exec -T spark-iceberg 
ipython ./provision.py
+          sleep 10
+
+      - name: Get minio container IP
+        run: |
+          echo "AWS_S3_ENDPOINT=http://$(docker inspect -f 
'{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' minio):9000" >> 
$GITHUB_ENV
+
+      - name: Run integration tests
+        env:
+          AWS_S3_ENDPOINT: "${{ env.AWS_S3_ENDPOINT }}"
+          AWS_REGION: "us-east-1"
+        run: |          
+          go test -tags integration -v -run="^TestScanner" ./table
+
+      - name: Show debug logs
+        if: ${{ failure() }}
+        run: docker compose -f dev/docker-compose.yml logs
diff --git a/dev/Dockerfile b/dev/Dockerfile
new file mode 100644
index 0000000..c8b9e65
--- /dev/null
+++ b/dev/Dockerfile
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM tabulario/spark-iceberg
+
+RUN pip3 install -q ipython
+RUN pip3 install pyiceberg[s3fs,hive]
+RUN pip3 install pyarrow
+
+COPY provision.py .
+
+ENTRYPOINT ["./entrypoint.sh"]
+CMD ["notebook"]
\ No newline at end of file
diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml
index 2978c42..63821cd 100644
--- a/dev/docker-compose.yml
+++ b/dev/docker-compose.yml
@@ -18,11 +18,12 @@ version: "3"
 
 services:
   spark-iceberg:
-    image: tabulario/spark-iceberg
+    image: pyiceberg-spark
     container_name: spark-iceberg    
+    build: .
     networks:
       iceberg_net:
-    depends_on:
+    depends_on:      
       - rest
       - minio
     volumes:
@@ -85,5 +86,6 @@ services:
       /usr/bin/mc policy set public minio/warehouse;
       tail -f /dev/null
       "
+
 networks:
-  iceberg_net:
\ No newline at end of file
+  iceberg_net:
diff --git a/dev/provision.py b/dev/provision.py
new file mode 100644
index 0000000..77906c1
--- /dev/null
+++ b/dev/provision.py
@@ -0,0 +1,379 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import current_date, date_add, expr
+
+from pyiceberg.catalog import load_catalog
+from pyiceberg.schema import Schema
+from pyiceberg.types import FixedType, NestedField, UUIDType
+
+spark = SparkSession.builder.getOrCreate()
+
+catalogs = {
+    'rest': load_catalog(
+        "rest",
+        **{
+            "type": "rest",
+            "uri": "http://rest:8181";,
+            "s3.endpoint": "http://minio:9000";,
+            "s3.access-key-id": "admin",
+            "s3.secret-access-key": "password",
+        },
+    ),
+}
+
+for catalog_name, catalog in catalogs.items():
+    spark.sql(
+        f"""
+      CREATE DATABASE IF NOT EXISTS default;
+    """
+    )
+
+    schema = Schema(
+        NestedField(field_id=1, name="uuid_col", field_type=UUIDType(), 
required=False),
+        NestedField(field_id=2, name="fixed_col", field_type=FixedType(25), 
required=False),
+    )
+
+    
catalog.create_table(identifier=f"default.test_uuid_and_fixed_unpartitioned", 
schema=schema)
+
+    spark.sql(
+        f"""
+        INSERT INTO default.test_uuid_and_fixed_unpartitioned VALUES
+        ('102cb62f-e6f8-4eb0-9973-d9b012ff0967', 
CAST('1234567890123456789012345' AS BINARY)),
+        ('ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226', 
CAST('1231231231231231231231231' AS BINARY)),
+        ('639cccce-c9d2-494a-a78c-278ab234f024', 
CAST('12345678901234567ass12345' AS BINARY)),
+        ('c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b', 
CAST('asdasasdads12312312312111' AS BINARY)),
+        ('923dae77-83d6-47cd-b4b0-d383e64ee57e', 
CAST('qweeqwwqq1231231231231111' AS BINARY));
+        """
+    )
+
+    spark.sql(
+        f"""
+      CREATE OR REPLACE TABLE default.test_null_nan
+      USING iceberg
+      AS SELECT
+        1            AS idx,
+        float('NaN') AS col_numeric
+    UNION ALL SELECT
+        2            AS idx,
+        null         AS col_numeric
+    UNION ALL SELECT
+        3            AS idx,
+        1            AS col_numeric;
+    """
+    )
+
+    spark.sql(
+        f"""
+      CREATE OR REPLACE TABLE default.test_null_nan_rewritten
+      USING iceberg
+      AS SELECT * FROM default.test_null_nan;
+    """
+    )
+
+    spark.sql(
+        f"""
+    CREATE OR REPLACE TABLE default.test_limit as
+      SELECT * LATERAL VIEW explode(ARRAY(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) AS 
idx;
+    """
+    )
+
+    spark.sql(
+        f"""
+    CREATE OR REPLACE TABLE default.test_positional_mor_deletes (
+        dt     date,
+        number integer,
+        letter string
+    )
+    USING iceberg
+    TBLPROPERTIES (
+        'write.delete.mode'='merge-on-read',
+        'write.update.mode'='merge-on-read',
+        'write.merge.mode'='merge-on-read',
+        'format-version'='2'
+    );
+    """
+    )
+
+    # Partitioning is not really needed, but there is a bug:
+    # https://github.com/apache/iceberg/pull/7685
+    spark.sql(f"ALTER TABLE default.test_positional_mor_deletes ADD PARTITION 
FIELD years(dt) AS dt_years")
+
+    spark.sql(
+        f"""
+    INSERT INTO default.test_positional_mor_deletes
+    VALUES
+        (CAST('2023-03-01' AS date), 1, 'a'),
+        (CAST('2023-03-02' AS date), 2, 'b'),
+        (CAST('2023-03-03' AS date), 3, 'c'),
+        (CAST('2023-03-04' AS date), 4, 'd'),
+        (CAST('2023-03-05' AS date), 5, 'e'),
+        (CAST('2023-03-06' AS date), 6, 'f'),
+        (CAST('2023-03-07' AS date), 7, 'g'),
+        (CAST('2023-03-08' AS date), 8, 'h'),
+        (CAST('2023-03-09' AS date), 9, 'i'),
+        (CAST('2023-03-10' AS date), 10, 'j'),
+        (CAST('2023-03-11' AS date), 11, 'k'),
+        (CAST('2023-03-12' AS date), 12, 'l');
+    """
+    )
+
+    spark.sql(f"ALTER TABLE default.test_positional_mor_deletes CREATE TAG 
tag_12")
+
+    spark.sql(f"ALTER TABLE default.test_positional_mor_deletes CREATE BRANCH 
without_5")
+
+    spark.sql(f"DELETE FROM 
default.test_positional_mor_deletes.branch_without_5 WHERE number = 5")
+
+    spark.sql(f"DELETE FROM default.test_positional_mor_deletes WHERE number = 
9")
+
+    spark.sql(
+        f"""
+      CREATE OR REPLACE TABLE default.test_positional_mor_double_deletes (
+        dt     date,
+        number integer,
+        letter string
+      )
+      USING iceberg
+      TBLPROPERTIES (
+        'write.delete.mode'='merge-on-read',
+        'write.update.mode'='merge-on-read',
+        'write.merge.mode'='merge-on-read',
+        'format-version'='2'
+      );
+    """
+    )
+
+    spark.sql(f"ALTER TABLE default.test_positional_mor_double_deletes ADD 
PARTITION FIELD years(dt) AS dt_years")
+
+    spark.sql(
+        f"""
+    INSERT INTO default.test_positional_mor_double_deletes
+    VALUES
+        (CAST('2023-03-01' AS date), 1, 'a'),
+        (CAST('2023-03-02' AS date), 2, 'b'),
+        (CAST('2023-03-03' AS date), 3, 'c'),
+        (CAST('2023-03-04' AS date), 4, 'd'),
+        (CAST('2023-03-05' AS date), 5, 'e'),
+        (CAST('2023-03-06' AS date), 6, 'f'),
+        (CAST('2023-03-07' AS date), 7, 'g'),
+        (CAST('2023-03-08' AS date), 8, 'h'),
+        (CAST('2023-03-09' AS date), 9, 'i'),
+        (CAST('2023-03-10' AS date), 10, 'j'),
+        (CAST('2023-03-11' AS date), 11, 'k'),
+        (CAST('2023-03-12' AS date), 12, 'l');
+    """
+    )
+
+    spark.sql(f"DELETE FROM default.test_positional_mor_double_deletes WHERE 
number = 9")
+
+    spark.sql(f"DELETE FROM default.test_positional_mor_double_deletes WHERE 
letter == 'f'")
+
+    all_types_dataframe = (
+        spark.range(0, 5, 1, 5)
+        .withColumnRenamed("id", "longCol")
+        .withColumn("intCol", expr("CAST(longCol AS INT)"))
+        .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
+        .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
+        .withColumn("dateCol", date_add(current_date(), 1))
+        .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
+        .withColumn("stringCol", expr("CAST(dateCol AS STRING)"))
+        .withColumn("booleanCol", expr("longCol > 5"))
+        .withColumn("binaryCol", expr("CAST(longCol AS BINARY)"))
+        .withColumn("byteCol", expr("CAST(longCol AS BYTE)"))
+        .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))"))
+        .withColumn("shortCol", expr("CAST(longCol AS SHORT)"))
+        .withColumn("mapCol", expr("MAP(longCol, decimalCol)"))
+        .withColumn("arrayCol", expr("ARRAY(longCol)"))
+        .withColumn("structCol", expr("STRUCT(mapCol, arrayCol)"))
+    )
+
+    
all_types_dataframe.writeTo(f"default.test_all_types").tableProperty("format-version",
 "2").partitionedBy(
+        "intCol"
+    ).createOrReplace()
+
+    for table_name, partition in [
+        ("test_partitioned_by_identity", "ts"),
+        ("test_partitioned_by_years", "years(dt)"),
+        ("test_partitioned_by_months", "months(dt)"),
+        ("test_partitioned_by_days", "days(ts)"),
+        ("test_partitioned_by_hours", "hours(ts)"),
+        ("test_partitioned_by_truncate", "truncate(1, letter)"),
+        ("test_partitioned_by_bucket", "bucket(16, number)"),
+    ]:
+        spark.sql(
+            f"""
+          CREATE OR REPLACE TABLE default.{table_name} (
+            dt     date,
+            ts     timestamp,
+            number integer,
+            letter string
+          )
+          USING iceberg;
+        """
+        )
+
+        spark.sql(f"ALTER TABLE default.{table_name} ADD PARTITION FIELD 
{partition}")
+
+        spark.sql(
+            f"""
+        INSERT INTO default.{table_name}
+        VALUES
+            (CAST('2022-03-01' AS date), CAST('2022-03-01 01:22:00' AS 
timestamp), 1, 'a'),
+            (CAST('2022-03-02' AS date), CAST('2022-03-02 02:22:00' AS 
timestamp), 2, 'b'),
+            (CAST('2022-03-03' AS date), CAST('2022-03-03 03:22:00' AS 
timestamp), 3, 'c'),
+            (CAST('2022-03-04' AS date), CAST('2022-03-04 04:22:00' AS 
timestamp), 4, 'd'),
+            (CAST('2023-03-05' AS date), CAST('2023-03-05 05:22:00' AS 
timestamp), 5, 'e'),
+            (CAST('2023-03-06' AS date), CAST('2023-03-06 06:22:00' AS 
timestamp), 6, 'f'),
+            (CAST('2023-03-07' AS date), CAST('2023-03-07 07:22:00' AS 
timestamp), 7, 'g'),
+            (CAST('2023-03-08' AS date), CAST('2023-03-08 08:22:00' AS 
timestamp), 8, 'h'),
+            (CAST('2023-03-09' AS date), CAST('2023-03-09 09:22:00' AS 
timestamp), 9, 'i'),
+            (CAST('2023-03-10' AS date), CAST('2023-03-10 10:22:00' AS 
timestamp), 10, 'j'),
+            (CAST('2023-03-11' AS date), CAST('2023-03-11 11:22:00' AS 
timestamp), 11, 'k'),
+            (CAST('2023-03-12' AS date), CAST('2023-03-12 12:22:00' AS 
timestamp), 12, 'l');
+        """
+        )
+
+    # There is an issue with CREATE OR REPLACE
+    # https://github.com/apache/iceberg/issues/8756
+    spark.sql(f"DROP TABLE IF EXISTS default.test_table_version")
+
+    spark.sql(
+        f"""
+    CREATE TABLE default.test_table_version (
+        dt     date,
+        number integer,
+        letter string
+    )
+    USING iceberg
+    TBLPROPERTIES (
+        'format-version'='1'
+    );
+    """
+    )
+
+    spark.sql(
+        f"""
+    CREATE TABLE default.test_table_sanitized_character (
+        `letter/abc` string
+    )
+    USING iceberg
+    TBLPROPERTIES (
+        'format-version'='1'
+    );
+    """
+    )
+
+    spark.sql(
+        f"""
+    INSERT INTO default.test_table_sanitized_character
+    VALUES
+        ('123')
+    """
+    )
+
+    spark.sql(
+        f"""
+    INSERT INTO default.test_table_sanitized_character
+    VALUES
+        ('123')
+    """
+    )
+
+    spark.sql(
+        f"""
+    CREATE TABLE default.test_table_add_column (
+        a string
+    )
+    USING iceberg
+    """
+    )
+
+    spark.sql(f"INSERT INTO default.test_table_add_column VALUES ('1')")
+
+    spark.sql(f"ALTER TABLE default.test_table_add_column ADD COLUMN b string")
+
+    spark.sql(f"INSERT INTO default.test_table_add_column VALUES ('2', '2')")
+
+    spark.sql(
+        f"""
+    CREATE TABLE default.test_table_empty_list_and_map (
+        col_list             array<int>,
+        col_map              map<int, int>,
+        col_list_with_struct array<struct<test:int>>
+    )
+    USING iceberg
+    TBLPROPERTIES (
+        'format-version'='1'
+    );
+    """
+    )
+
+    spark.sql(
+        f"""
+    INSERT INTO default.test_table_empty_list_and_map
+    VALUES (null, null, null),
+           (array(), map(), array(struct(1)))
+    """
+    )
+
+    spark.sql(
+        f"""
+        CREATE OR REPLACE TABLE default.test_table_snapshot_operations (
+            number integer
+        )
+        USING iceberg
+        TBLPROPERTIES (
+            'format-version'='2'
+        );
+        """
+    )
+
+    spark.sql(
+        f"""
+        INSERT INTO default.test_table_snapshot_operations
+        VALUES (1)
+        """
+    )
+
+    spark.sql(
+        f"""
+        INSERT INTO default.test_table_snapshot_operations
+        VALUES (2)
+        """
+    )
+
+    spark.sql(
+        f"""
+        DELETE FROM default.test_table_snapshot_operations
+        WHERE number = 2
+        """
+    )
+
+    spark.sql(
+        f"""
+        INSERT INTO default.test_table_snapshot_operations
+        VALUES (3)
+        """
+    )
+
+    spark.sql(
+        f"""
+        INSERT INTO default.test_table_snapshot_operations
+        VALUES (4)
+        """
+    )
\ No newline at end of file
diff --git a/exprs.go b/exprs.go
index bc451ca..4bdb8c1 100644
--- a/exprs.go
+++ b/exprs.go
@@ -980,8 +980,50 @@ func (bsp *boundSetPredicate[T]) String() string {
        return fmt.Sprintf("Bound%s(term=%s, {%v})", bsp.op, bsp.term, 
bsp.lits.Members())
 }
 func (bsp *boundSetPredicate[T]) AsUnbound(r Reference, lits []Literal) 
UnboundPredicate {
-       return &unboundSetPredicate{op: bsp.op, term: r, lits: 
newLiteralSet(lits...)}
+       litSet := newLiteralSet(lits...)
+       if litSet.Len() == 1 {
+               switch bsp.op {
+               case OpIn:
+                       return LiteralPredicate(OpEQ, r, lits[0])
+               case OpNotIn:
+                       return LiteralPredicate(OpNEQ, r, lits[0])
+               }
+       }
+
+       return &unboundSetPredicate{op: bsp.op, term: r, lits: litSet}
 }
+
 func (bsp *boundSetPredicate[T]) Literals() Set[Literal] {
        return bsp.lits
 }
+
+type BoundTransform struct {
+       transform Transform
+       term      BoundTerm
+}
+
+func (*BoundTransform) isTerm() {}
+func (b *BoundTransform) String() string {
+       return fmt.Sprintf("BoundTransform(transform=%s, term=%s)",
+               b.transform, b.term)
+}
+
+func (b *BoundTransform) Ref() BoundReference { return b.term.Ref() }
+func (b *BoundTransform) Type() Type          { return 
b.transform.ResultType(b.term.Type()) }
+
+func (b *BoundTransform) Equals(other BoundTerm) bool {
+       rhs, ok := other.(*BoundTransform)
+       if !ok {
+               return false
+       }
+
+       return b.transform.Equals(rhs.transform) && b.term.Equals(rhs.term)
+}
+
+func (b *BoundTransform) evalToLiteral(st structLike) Optional[Literal] {
+       return b.transform.Apply(b.term.evalToLiteral(st))
+}
+
+func (b *BoundTransform) evalIsNull(st structLike) bool {
+       return !b.evalToLiteral(st).Valid
+}
diff --git a/go.mod b/go.mod
index 49dafc3..8a344c5 100644
--- a/go.mod
+++ b/go.mod
@@ -32,6 +32,7 @@ require (
        github.com/hamba/avro/v2 v2.23.0
        github.com/pterm/pterm v0.12.79
        github.com/stretchr/testify v1.9.0
+       github.com/twmb/murmur3 v1.1.8
        github.com/wolfeidau/s3iofs v1.5.2
        golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
 )
diff --git a/go.sum b/go.sum
index 33f11eb..5fd4ae1 100644
--- a/go.sum
+++ b/go.sum
@@ -130,6 +130,8 @@ github.com/stretchr/testify v1.6.1/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
 github.com/stretchr/testify v1.7.0/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.9.0 
h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
 github.com/stretchr/testify v1.9.0/go.mod 
h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+github.com/twmb/murmur3 v1.1.8 h1:8Yt9taO/WN3l08xErzjeschgZU2QSrwm1kclYq+0aRg=
+github.com/twmb/murmur3 v1.1.8/go.mod 
h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
 github.com/wolfeidau/s3iofs v1.5.2 
h1:2dmzSxdrSY29GsILVheJqBbURVQX3KglggSBtVWCYj4=
 github.com/wolfeidau/s3iofs v1.5.2/go.mod 
h1:fPAKzdWmZ1Z2L9vnqL6d1eb7pVsUgkUstxQUkG1HIHA=
 github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778/go.mod 
h1:2MuV+tbUrU1zIOPMxZ5EncGwgmMJsa+9ucAQZXxsObs=
diff --git a/io/s3.go b/io/s3.go
index 6702080..7396130 100644
--- a/io/s3.go
+++ b/io/s3.go
@@ -64,9 +64,13 @@ func createS3FileIO(parsed *url.URL, props 
map[string]string) (IO, error) {
        }
 
        if region, ok := props[S3Region]; ok {
-               opts = append(opts, func(o *s3.Options) { o.Region = region })
+               opts = append(opts, func(o *s3.Options) {
+                       o.Region = region
+               })
        } else if region, ok := props["client.region"]; ok {
-               opts = append(opts, func(o *s3.Options) { o.Region = region })
+               opts = append(opts, func(o *s3.Options) {
+                       o.Region = region
+               })
        }
 
        accessKey, secretAccessKey := props[S3AccessKeyID], 
props[S3SecretAccessKey]
diff --git a/manifest.go b/manifest.go
index 57465f1..9a0d8b7 100644
--- a/manifest.go
+++ b/manifest.go
@@ -932,8 +932,6 @@ type DataFile interface {
        // SortOrderID returns the id representing the sort order for this
        // file, or nil if there is no sort order.
        SortOrderID() *int
-
-       setFieldNameToIDMap(map[string]int)
 }
 
 // ManifestEntry is an interface for both v1 and v2 manifest entries.
diff --git a/table/evaluators.go b/table/evaluators.go
index 343c8ef..1458c98 100644
--- a/table/evaluators.go
+++ b/table/evaluators.go
@@ -18,6 +18,10 @@
 package table
 
 import (
+       "fmt"
+       "math"
+       "slices"
+
        "github.com/apache/iceberg-go"
        "github.com/google/uuid"
 )
@@ -61,6 +65,45 @@ func (m *manifestEvalVisitor) Eval(manifest 
iceberg.ManifestFile) (bool, error)
        return rowsMightMatch, nil
 }
 
+func removeBoundCmp[T iceberg.LiteralType](bound iceberg.Literal, vals 
[]iceberg.Literal, cmpToDelete int) []iceberg.Literal {
+       val := bound.(iceberg.TypedLiteral[T])
+       cmp := val.Comparator()
+
+       return slices.DeleteFunc(vals, func(v iceberg.Literal) bool {
+               return cmp(val.Value(), v.(iceberg.TypedLiteral[T]).Value()) == 
cmpToDelete
+       })
+}
+
+func removeBoundCheck(bound iceberg.Literal, vals []iceberg.Literal, toDelete 
int) []iceberg.Literal {
+       switch bound.Type().(type) {
+       case iceberg.BooleanType:
+               return removeBoundCmp[bool](bound, vals, toDelete)
+       case iceberg.Int32Type:
+               return removeBoundCmp[int32](bound, vals, toDelete)
+       case iceberg.Int64Type:
+               return removeBoundCmp[int64](bound, vals, toDelete)
+       case iceberg.Float32Type:
+               return removeBoundCmp[float32](bound, vals, toDelete)
+       case iceberg.Float64Type:
+               return removeBoundCmp[float64](bound, vals, toDelete)
+       case iceberg.DateType:
+               return removeBoundCmp[iceberg.Date](bound, vals, toDelete)
+       case iceberg.TimeType:
+               return removeBoundCmp[iceberg.Time](bound, vals, toDelete)
+       case iceberg.TimestampType, iceberg.TimestampTzType:
+               return removeBoundCmp[iceberg.Timestamp](bound, vals, toDelete)
+       case iceberg.BinaryType, iceberg.FixedType:
+               return removeBoundCmp[[]byte](bound, vals, toDelete)
+       case iceberg.StringType:
+               return removeBoundCmp[string](bound, vals, toDelete)
+       case iceberg.UUIDType:
+               return removeBoundCmp[uuid.UUID](bound, vals, toDelete)
+       case iceberg.DecimalType:
+               return removeBoundCmp[iceberg.Decimal](bound, vals, toDelete)
+       }
+       panic("unrecognized type")
+}
+
 func allBoundCmp[T iceberg.LiteralType](bound iceberg.Literal, set 
iceberg.Set[iceberg.Literal], want int) bool {
        val := bound.(iceberg.TypedLiteral[T])
        cmp := val.Comparator()
@@ -488,3 +531,595 @@ func (m *manifestEvalVisitor) VisitBound(pred 
iceberg.BoundPredicate) bool {
 func (m *manifestEvalVisitor) VisitNot(child bool) bool       { return !child }
 func (m *manifestEvalVisitor) VisitAnd(left, right bool) bool { return left && 
right }
 func (m *manifestEvalVisitor) VisitOr(left, right bool) bool  { return left || 
right }
+
+type projectionEvaluator struct {
+       spec          iceberg.PartitionSpec
+       schema        *iceberg.Schema
+       caseSensitive bool
+}
+
+func (*projectionEvaluator) VisitTrue() iceberg.BooleanExpression  { return 
iceberg.AlwaysTrue{} }
+func (*projectionEvaluator) VisitFalse() iceberg.BooleanExpression { return 
iceberg.AlwaysFalse{} }
+func (*projectionEvaluator) VisitNot(child iceberg.BooleanExpression) 
iceberg.BooleanExpression {
+       panic(fmt.Errorf("%w: cannot project 'not' expression, should be 
rewritten %s",
+               iceberg.ErrInvalidArgument, child))
+}
+
+func (*projectionEvaluator) VisitAnd(left, right iceberg.BooleanExpression) 
iceberg.BooleanExpression {
+       return iceberg.NewAnd(left, right)
+}
+
+func (*projectionEvaluator) VisitOr(left, right iceberg.BooleanExpression) 
iceberg.BooleanExpression {
+       return iceberg.NewOr(left, right)
+}
+
+func (*projectionEvaluator) VisitUnbound(pred iceberg.UnboundPredicate) 
iceberg.BooleanExpression {
+       panic(fmt.Errorf("%w: cannot project unbound predicate: %s", 
iceberg.ErrInvalidArgument, pred))
+}
+
+type inclusiveProjection struct{ projectionEvaluator }
+
+func (p *inclusiveProjection) Project(expr iceberg.BooleanExpression) 
(iceberg.BooleanExpression, error) {
+       expr, err := iceberg.RewriteNotExpr(expr)
+       if err != nil {
+               return nil, err
+       }
+
+       bound, err := iceberg.BindExpr(p.schema, expr, p.caseSensitive)
+       if err != nil {
+               return nil, err
+       }
+
+       return iceberg.VisitExpr(bound, p)
+}
+
+func (p *inclusiveProjection) VisitBound(pred iceberg.BoundPredicate) 
iceberg.BooleanExpression {
+       parts := p.spec.FieldsBySourceID(pred.Term().Ref().Field().ID)
+
+       var result iceberg.BooleanExpression = iceberg.AlwaysTrue{}
+       for _, part := range parts {
+               // consider (d = 2019-01-01) with bucket(7, d) and bucket(5, d)
+               // projections: b1 = bucket(7, '2019-01-01') = 5, b2 = 
bucket(5, '2019-01-01') = 0
+               // any value where b1 != 5 or any value where b2 != 0 cannot be 
the '2019-01-01'
+               //
+               // similarly, if partitioning by day(ts) and hour(ts), the more 
restrictive
+               // projection should be used. ts = 2019-01-01T01:00:00 produces 
day=2019-01-01 and
+               // hour=2019-01-01-01. the value will be in 2019-01-01-01 and 
not in 2019-01-01-02.
+               inclProjection, err := part.Transform.Project(part.Name, pred)
+               if err != nil {
+                       panic(err)
+               }
+               if inclProjection != nil {
+                       result = iceberg.NewAnd(result, inclProjection)
+               }
+       }
+
+       return result
+}
+
+func newInclusiveProjection(s *iceberg.Schema, spec iceberg.PartitionSpec, 
caseSensitive bool) func(iceberg.BooleanExpression) (iceberg.BooleanExpression, 
error) {
+       return (&inclusiveProjection{
+               projectionEvaluator: projectionEvaluator{
+                       schema:        s,
+                       spec:          spec,
+                       caseSensitive: caseSensitive,
+               },
+       }).Project
+}
+
+type metricsEvaluator struct {
+       valueCounts map[int]int64
+       nullCounts  map[int]int64
+       nanCounts   map[int]int64
+       lowerBounds map[int][]byte
+       upperBounds map[int][]byte
+}
+
+func (m *metricsEvaluator) VisitTrue() bool  { return rowsMightMatch }
+func (m *metricsEvaluator) VisitFalse() bool { return rowsCannotMatch }
+func (m *metricsEvaluator) VisitNot(child bool) bool {
+       panic(fmt.Errorf("%w: NOT should be rewritten %v", 
iceberg.ErrInvalidArgument, child))
+}
+func (m *metricsEvaluator) VisitAnd(left, right bool) bool { return left && 
right }
+func (m *metricsEvaluator) VisitOr(left, right bool) bool  { return left || 
right }
+
+func (m *metricsEvaluator) containsNullsOnly(id int) bool {
+       valCount, ok := m.valueCounts[id]
+       if !ok {
+               return false
+       }
+
+       nullCount, ok := m.nullCounts[id]
+       if !ok {
+               return false
+       }
+
+       return valCount == nullCount
+}
+
+func (m *metricsEvaluator) containsNansOnly(id int) bool {
+       nanCount, ok := m.nanCounts[id]
+       if !ok {
+               return false
+       }
+
+       valCount, ok := m.valueCounts[id]
+       if !ok {
+               return false
+       }
+
+       return nanCount == valCount
+}
+
+func (m *metricsEvaluator) isNan(v iceberg.Literal) bool {
+       switch v := v.(type) {
+       case iceberg.Float32Literal:
+               return math.IsNaN(float64(v))
+       case iceberg.Float64Literal:
+               return math.IsNaN(float64(v))
+       default:
+               return false
+       }
+}
+
+func newInclusiveMetricsEvaluator(s *iceberg.Schema, expr 
iceberg.BooleanExpression,
+       caseSensitive bool, includeEmptyFiles bool) (func(iceberg.DataFile) 
(bool, error), error) {
+
+       rewritten, err := iceberg.RewriteNotExpr(expr)
+       if err != nil {
+               return nil, err
+       }
+
+       bound, err := iceberg.BindExpr(s, rewritten, caseSensitive)
+       if err != nil {
+               return nil, err
+       }
+
+       return (&inclusiveMetricsEval{
+               st:                s.AsStruct(),
+               includeEmptyFiles: includeEmptyFiles,
+               expr:              bound,
+       }).Eval, nil
+}
+
+type inclusiveMetricsEval struct {
+       metricsEvaluator
+
+       st                iceberg.StructType
+       expr              iceberg.BooleanExpression
+       includeEmptyFiles bool
+}
+
+func (m *inclusiveMetricsEval) Eval(file iceberg.DataFile) (bool, error) {
+       if !m.includeEmptyFiles && file.Count() == 0 {
+               return rowsCannotMatch, nil
+       }
+
+       m.valueCounts, m.nullCounts = file.ValueCounts(), file.NullValueCounts()
+       m.nanCounts = file.NaNValueCounts()
+       m.lowerBounds, m.upperBounds = file.LowerBoundValues(), 
file.UpperBoundValues()
+
+       return iceberg.VisitExpr(m.expr, m)
+}
+
+func (m *inclusiveMetricsEval) mayContainNull(fieldID int) bool {
+       if m.nullCounts == nil {
+               return true
+       }
+
+       _, ok := m.nullCounts[fieldID]
+       return ok
+}
+
+func (m *inclusiveMetricsEval) VisitUnbound(iceberg.UnboundPredicate) bool {
+       panic("need bound predicate")
+}
+
+func (m *inclusiveMetricsEval) VisitBound(pred iceberg.BoundPredicate) bool {
+       return iceberg.VisitBoundPredicate(pred, m)
+}
+
+func (m *inclusiveMetricsEval) VisitIsNull(t iceberg.BoundTerm) bool {
+       fieldID := t.Ref().Field().ID
+       if cnt, exists := m.nullCounts[fieldID]; exists && cnt == 0 {
+               return rowsCannotMatch
+       }
+       return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitNotNull(t iceberg.BoundTerm) bool {
+       // no need to check whether the field is required because binding 
evaluates
+       // that case if the column has no non-null values, the expression 
cannot match
+       fieldID := t.Ref().Field().ID
+       if m.containsNullsOnly(fieldID) {
+               return rowsCannotMatch
+       }
+       return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitIsNan(t iceberg.BoundTerm) bool {
+       fieldID := t.Ref().Field().ID
+       if cnt, exists := m.nanCounts[fieldID]; exists && cnt == 0 {
+               return rowsCannotMatch
+       }
+       // when there's no nancounts information but we already know the column
+       // contains null it's guaranteed that there's no nan value
+       if m.containsNullsOnly(fieldID) {
+               return rowsCannotMatch
+       }
+       return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitNotNan(t iceberg.BoundTerm) bool {
+       fieldID := t.Ref().Field().ID
+
+       if m.containsNansOnly(fieldID) {
+               return rowsCannotMatch
+       }
+       return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitLess(t iceberg.BoundTerm, lit 
iceberg.Literal) bool {
+       field := t.Ref().Field()
+       fieldID := field.ID
+
+       if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) {
+               return rowsCannotMatch
+       }
+
+       if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+               panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s",
+                       iceberg.ErrInvalidTypeString, field.Type))
+       }
+
+       if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil {
+               lowerBound, err := iceberg.LiteralFromBytes(field.Type, 
lowerBoundBytes)
+               if err != nil {
+                       panic(err)
+               }
+
+               if m.isNan(lowerBound) {
+                       // nan indicates unreliable bounds
+                       return rowsMightMatch
+               }
+
+               if getCmpLiteral(lowerBound)(lowerBound, lit) >= 0 {
+                       return rowsCannotMatch
+               }
+       }
+
+       return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitLessEqual(t iceberg.BoundTerm, lit 
iceberg.Literal) bool {
+       field := t.Ref().Field()
+       fieldID := field.ID
+
+       if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) {
+               return rowsCannotMatch
+       }
+
+       if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+               panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s",
+                       iceberg.ErrInvalidTypeString, field.Type))
+       }
+
+       if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil {
+               lowerBound, err := iceberg.LiteralFromBytes(field.Type, 
lowerBoundBytes)
+               if err != nil {
+                       panic(err)
+               }
+
+               if m.isNan(lowerBound) {
+                       // nan indicates unreliable bounds
+                       return rowsMightMatch
+               }
+
+               if getCmpLiteral(lowerBound)(lowerBound, lit) > 0 {
+                       return rowsCannotMatch
+               }
+       }
+
+       return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitGreater(t iceberg.BoundTerm, lit 
iceberg.Literal) bool {
+       field := t.Ref().Field()
+       fieldID := field.ID
+
+       if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) {
+               return rowsCannotMatch
+       }
+
+       if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+               panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s",
+                       iceberg.ErrInvalidTypeString, field.Type))
+       }
+
+       if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil {
+               upperBound, err := iceberg.LiteralFromBytes(field.Type, 
upperBoundBytes)
+               if err != nil {
+                       panic(err)
+               }
+
+               if getCmpLiteral(upperBound)(upperBound, lit) <= 0 {
+                       if m.isNan(upperBound) {
+                               return rowsMightMatch
+                       }
+
+                       return rowsCannotMatch
+               }
+       }
+
+       return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitGreaterEqual(t iceberg.BoundTerm, lit 
iceberg.Literal) bool {
+       field := t.Ref().Field()
+       fieldID := field.ID
+
+       if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) {
+               return rowsCannotMatch
+       }
+
+       if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+               panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s",
+                       iceberg.ErrInvalidTypeString, field.Type))
+       }
+
+       if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil {
+               upperBound, err := iceberg.LiteralFromBytes(field.Type, 
upperBoundBytes)
+               if err != nil {
+                       panic(err)
+               }
+
+               if getCmpLiteral(upperBound)(upperBound, lit) < 0 {
+                       if m.isNan(upperBound) {
+                               return rowsMightMatch
+                       }
+
+                       return rowsCannotMatch
+               }
+       }
+
+       return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitEqual(t iceberg.BoundTerm, lit 
iceberg.Literal) bool {
+       field := t.Ref().Field()
+       fieldID := field.ID
+
+       if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) {
+               return rowsCannotMatch
+       }
+
+       if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+               panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s",
+                       iceberg.ErrInvalidTypeString, field.Type))
+       }
+
+       var cmp func(iceberg.Literal, iceberg.Literal) int
+       if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil {
+               lowerBound, err := iceberg.LiteralFromBytes(field.Type, 
lowerBoundBytes)
+               if err != nil {
+                       panic(err)
+               }
+
+               if m.isNan(lowerBound) {
+                       return rowsMightMatch
+               }
+
+               cmp = getCmpLiteral(lowerBound)
+               if cmp(lowerBound, lit) == 1 {
+                       return rowsCannotMatch
+               }
+       }
+
+       if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil {
+               upperBound, err := iceberg.LiteralFromBytes(field.Type, 
upperBoundBytes)
+               if err != nil {
+                       panic(err)
+               }
+
+               if m.isNan(upperBound) {
+                       return rowsMightMatch
+               }
+
+               if cmp(upperBound, lit) == -1 {
+                       return rowsCannotMatch
+               }
+       }
+
+       return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitNotEqual(iceberg.BoundTerm, 
iceberg.Literal) bool {
+       return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitIn(t iceberg.BoundTerm, s 
iceberg.Set[iceberg.Literal]) bool {
+       field := t.Ref().Field()
+       fieldID := field.ID
+
+       if m.containsNullsOnly(fieldID) || m.containsNansOnly(fieldID) {
+               return rowsCannotMatch
+       }
+
+       if s.Len() > inPredicateLimit {
+               // skip evaluating the predicate if the number of values is too 
big
+               return rowsMightMatch
+       }
+
+       if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+               panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s",
+                       iceberg.ErrInvalidTypeString, field.Type))
+       }
+
+       values := s.Members()
+       if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil {
+               lowerBound, err := iceberg.LiteralFromBytes(field.Type, 
lowerBoundBytes)
+               if err != nil {
+                       panic(lowerBound)
+               }
+
+               if m.isNan(lowerBound) {
+                       return rowsMightMatch
+               }
+
+               values = removeBoundCheck(lowerBound, values, 1)
+               if len(values) == 0 {
+                       return rowsCannotMatch
+               }
+       }
+
+       if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil {
+               upperBound, err := iceberg.LiteralFromBytes(field.Type, 
upperBoundBytes)
+               if err != nil {
+                       panic(err)
+               }
+
+               if m.isNan(upperBound) {
+                       return rowsMightMatch
+               }
+
+               values = removeBoundCheck(upperBound, values, -1)
+               if len(values) == 0 {
+                       return rowsCannotMatch
+               }
+       }
+
+       return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitNotIn(iceberg.BoundTerm, 
iceberg.Set[iceberg.Literal]) bool {
+       // because the bounds are not necessarily a min or max value, this 
cannot be
+       // answered using them. notIn(col, {X, ...}) with (XX, Y) doesn't 
guarantee that
+       // X is a value in col
+       return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitStartsWith(t iceberg.BoundTerm, lit 
iceberg.Literal) bool {
+       field := t.Ref().Field()
+       fieldID := field.ID
+
+       if m.containsNullsOnly(fieldID) {
+               return rowsCannotMatch
+       }
+
+       if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+               panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s",
+                       iceberg.ErrInvalidTypeString, field.Type))
+       }
+
+       var prefix string
+       if val, ok := lit.(iceberg.TypedLiteral[string]); ok {
+               prefix = val.Value()
+       } else {
+               prefix = string(lit.(iceberg.TypedLiteral[[]byte]).Value())
+       }
+
+       lenPrefix := len(prefix)
+
+       if lowerBoundBytes := m.lowerBounds[fieldID]; lowerBoundBytes != nil {
+               lowerBound, err := iceberg.LiteralFromBytes(field.Type, 
lowerBoundBytes)
+               if err != nil {
+                       panic(err)
+               }
+
+               var v string
+               switch l := lowerBound.(type) {
+               case iceberg.TypedLiteral[string]:
+                       v = l.Value()
+               case iceberg.TypedLiteral[[]byte]:
+                       v = string(l.Value())
+               }
+
+               if len(v) > lenPrefix {
+                       v = v[:lenPrefix]
+               }
+
+               if len(v) > 0 && v > prefix {
+                       return rowsCannotMatch
+               }
+       }
+
+       if upperBoundBytes := m.upperBounds[fieldID]; upperBoundBytes != nil {
+               upperBound, err := iceberg.LiteralFromBytes(field.Type, 
upperBoundBytes)
+               if err != nil {
+                       panic(err)
+               }
+
+               var v string
+               switch u := upperBound.(type) {
+               case iceberg.TypedLiteral[string]:
+                       v = u.Value()
+               case iceberg.TypedLiteral[[]byte]:
+                       v = string(u.Value())
+               }
+
+               if len(v) > lenPrefix {
+                       v = v[:lenPrefix]
+               }
+
+               if len(v) > 0 && v < prefix {
+                       return rowsCannotMatch
+               }
+       }
+
+       return rowsMightMatch
+}
+
+func (m *inclusiveMetricsEval) VisitNotStartsWith(t iceberg.BoundTerm, lit 
iceberg.Literal) bool {
+       field := t.Ref().Field()
+       fieldID := field.ID
+
+       if m.mayContainNull(fieldID) {
+               return rowsMightMatch
+       }
+
+       if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+               panic(fmt.Errorf("%w: expected iceberg.PrimitiveType, got %s",
+                       iceberg.ErrInvalidTypeString, field.Type))
+       }
+
+       // not_starts_with will match unless all values must start with the 
prefix.
+       // this happens when the lower and upper bounds both start with the 
prefix
+       lowerBoundBytes, upperBoundBytes := m.lowerBounds[fieldID], 
m.upperBounds[fieldID]
+       if lowerBoundBytes != nil && upperBoundBytes != nil {
+               lowerBound, err := iceberg.LiteralFromBytes(field.Type, 
lowerBoundBytes)
+               if err != nil {
+                       panic(err)
+               }
+
+               upperBound, err := iceberg.LiteralFromBytes(field.Type, 
upperBoundBytes)
+               if err != nil {
+                       panic(err)
+               }
+
+               var prefix, lower, upper string
+               if val, ok := lit.(iceberg.TypedLiteral[string]); ok {
+                       prefix = val.Value()
+                       lower, upper = 
lowerBound.(iceberg.TypedLiteral[string]).Value(), 
upperBound.(iceberg.TypedLiteral[string]).Value()
+               } else {
+                       prefix = 
string(lit.(iceberg.TypedLiteral[[]byte]).Value())
+                       lower, upper = 
string(lowerBound.(iceberg.TypedLiteral[[]byte]).Value()), 
string(upperBound.(iceberg.TypedLiteral[[]byte]).Value())
+               }
+
+               lenPrefix := len(prefix)
+               if len(lower) < lenPrefix {
+                       return rowsMightMatch
+               }
+
+               if lower[:lenPrefix] == prefix {
+                       if len(upper) < lenPrefix {
+                               return rowsMightMatch
+                       }
+
+                       if upper[:lenPrefix] == prefix {
+                               return rowsCannotMatch
+                       }
+               }
+       }
+
+       return rowsMightMatch
+}
diff --git a/table/evaluators_test.go b/table/evaluators_test.go
index b8c2671..a543e93 100644
--- a/table/evaluators_test.go
+++ b/table/evaluators_test.go
@@ -18,11 +18,13 @@
 package table
 
 import (
+       "math"
        "testing"
 
        "github.com/apache/iceberg-go"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
+       "github.com/stretchr/testify/suite"
 )
 
 const (
@@ -503,3 +505,1295 @@ func TestManifestEvaluator(t *testing.T) {
                }
        })
 }
+
+type ProjectionTestSuite struct {
+       suite.Suite
+}
+
+func (*ProjectionTestSuite) schema() *iceberg.Schema {
+       return iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "event_date", Type: 
iceberg.PrimitiveTypes.Date},
+               iceberg.NestedField{ID: 4, Name: "event_ts", Type: 
iceberg.PrimitiveTypes.Timestamp},
+       )
+}
+
+func (*ProjectionTestSuite) emptySpec() iceberg.PartitionSpec {
+       return iceberg.NewPartitionSpec()
+}
+
+func (*ProjectionTestSuite) idSpec() iceberg.PartitionSpec {
+       return iceberg.NewPartitionSpec(
+               iceberg.PartitionField{SourceID: 1, FieldID: 1000,
+                       Transform: iceberg.IdentityTransform{}, Name: 
"id_part"},
+       )
+}
+
+func (*ProjectionTestSuite) bucketSpec() iceberg.PartitionSpec {
+       return iceberg.NewPartitionSpec(
+               iceberg.PartitionField{SourceID: 2, FieldID: 1000,
+                       Transform: iceberg.BucketTransform{NumBuckets: 16}, 
Name: "data_bucket"},
+       )
+}
+
+func (*ProjectionTestSuite) daySpec() iceberg.PartitionSpec {
+       return iceberg.NewPartitionSpec(
+               iceberg.PartitionField{SourceID: 4, FieldID: 1000,
+                       Transform: iceberg.DayTransform{}, Name: "date"},
+               iceberg.PartitionField{SourceID: 3, FieldID: 1001,
+                       Transform: iceberg.DayTransform{}, Name: "ddate"},
+       )
+}
+
+func (*ProjectionTestSuite) hourSpec() iceberg.PartitionSpec {
+       return iceberg.NewPartitionSpec(
+               iceberg.PartitionField{SourceID: 4, FieldID: 1000,
+                       Transform: iceberg.HourTransform{}, Name: "hour"},
+       )
+}
+
+func (*ProjectionTestSuite) truncateStrSpec() iceberg.PartitionSpec {
+       return iceberg.NewPartitionSpec(
+               iceberg.PartitionField{SourceID: 2, FieldID: 1000,
+                       Transform: iceberg.TruncateTransform{Width: 2}, Name: 
"data_trunc"},
+       )
+}
+
+func (*ProjectionTestSuite) truncateIntSpec() iceberg.PartitionSpec {
+       return iceberg.NewPartitionSpec(
+               iceberg.PartitionField{SourceID: 1, FieldID: 1000,
+                       Transform: iceberg.TruncateTransform{Width: 10}, Name: 
"id_trunc"},
+       )
+}
+
+func (*ProjectionTestSuite) idAndBucketSpec() iceberg.PartitionSpec {
+       return iceberg.NewPartitionSpec(
+               iceberg.PartitionField{SourceID: 1, FieldID: 1000,
+                       Transform: iceberg.IdentityTransform{}, Name: 
"id_part"},
+               iceberg.PartitionField{SourceID: 2, FieldID: 1001,
+                       Transform: iceberg.BucketTransform{NumBuckets: 16}, 
Name: "data_bucket"},
+       )
+}
+
+func (p *ProjectionTestSuite) TestIdentityProjection() {
+       schema, spec := p.schema(), p.idSpec()
+
+       idRef, idPartRef := iceberg.Reference("id"), 
iceberg.Reference("id_part")
+       tests := []struct {
+               pred     iceberg.BooleanExpression
+               expected iceberg.BooleanExpression
+       }{
+               {iceberg.NotNull(idRef), iceberg.NotNull(idPartRef)},
+               {iceberg.IsNull(idRef), iceberg.IsNull(idPartRef)},
+               {iceberg.LessThan(idRef, int64(100)), 
iceberg.LessThan(idPartRef, int64(100))},
+               {iceberg.LessThanEqual(idRef, int64(101)), 
iceberg.LessThanEqual(idPartRef, int64(101))},
+               {iceberg.GreaterThan(idRef, int64(102)), 
iceberg.GreaterThan(idPartRef, int64(102))},
+               {iceberg.GreaterThanEqual(idRef, int64(103)), 
iceberg.GreaterThanEqual(idPartRef, int64(103))},
+               {iceberg.EqualTo(idRef, int64(104)), iceberg.EqualTo(idPartRef, 
int64(104))},
+               {iceberg.NotEqualTo(idRef, int64(105)), 
iceberg.NotEqualTo(idPartRef, int64(105))},
+               {iceberg.IsIn(idRef, int64(3), 4, 5), iceberg.IsIn(idPartRef, 
int64(3), 4, 5)},
+               {iceberg.NotIn(idRef, int64(3), 4, 5), iceberg.NotIn(idPartRef, 
int64(3), 4, 5)},
+       }
+
+       project := newInclusiveProjection(schema, spec, true)
+       for _, tt := range tests {
+               p.Run(tt.pred.String(), func() {
+                       expr, err := project(tt.pred)
+                       p.Require().NoError(err)
+                       p.Truef(tt.expected.Equals(expr), "expected: %s\ngot: 
%s", tt.expected, expr)
+               })
+       }
+}
+
+func (p *ProjectionTestSuite) TestBucketProjection() {
+       schema, spec := p.schema(), p.bucketSpec()
+
+       dataRef, dataBkt := iceberg.Reference("data"), 
iceberg.Reference("data_bucket")
+       tests := []struct {
+               pred, expected iceberg.BooleanExpression
+       }{
+               {iceberg.NotNull(dataRef), iceberg.NotNull(dataBkt)},
+               {iceberg.IsNull(dataRef), iceberg.IsNull(dataBkt)},
+               {iceberg.LessThan(dataRef, "val"), iceberg.AlwaysTrue{}},
+               {iceberg.LessThanEqual(dataRef, "val"), iceberg.AlwaysTrue{}},
+               {iceberg.GreaterThan(dataRef, "val"), iceberg.AlwaysTrue{}},
+               {iceberg.GreaterThanEqual(dataRef, "val"), 
iceberg.AlwaysTrue{}},
+               {iceberg.EqualTo(dataRef, "val"), iceberg.EqualTo(dataBkt, 
int32(14))},
+               {iceberg.NotEqualTo(dataRef, "val"), iceberg.AlwaysTrue{}},
+               {iceberg.IsIn(dataRef, "v1", "v2", "v3"), iceberg.IsIn(dataBkt, 
int32(1), 3, 13)},
+               {iceberg.NotIn(dataRef, "v1", "v2", "v3"), 
iceberg.AlwaysTrue{}},
+       }
+
+       project := newInclusiveProjection(schema, spec, true)
+       for _, tt := range tests {
+               p.Run(tt.pred.String(), func() {
+                       expr, err := project(tt.pred)
+                       p.Require().NoError(err)
+                       p.Truef(tt.expected.Equals(expr), "expected: %s\ngot: 
%s", tt.expected, expr)
+               })
+       }
+}
+
+func (p *ProjectionTestSuite) TestHourProjection() {
+       schema, spec := p.schema(), p.hourSpec()
+
+       ref, hour := iceberg.Reference("event_ts"), iceberg.Reference("hour")
+       tests := []struct {
+               pred, expected iceberg.BooleanExpression
+       }{
+               {iceberg.NotNull(ref), iceberg.NotNull(hour)},
+               {iceberg.IsNull(ref), iceberg.IsNull(hour)},
+               {iceberg.LessThan(ref, "2022-11-27T10:00:00"), 
iceberg.LessThanEqual(hour, int32(463761))},
+               {iceberg.LessThanEqual(ref, "2022-11-27T10:00:00"), 
iceberg.LessThanEqual(hour, int32(463762))},
+               {iceberg.GreaterThan(ref, "2022-11-27T09:59:59.999999"), 
iceberg.GreaterThanEqual(hour, int32(463762))},
+               {iceberg.GreaterThanEqual(ref, "2022-11-27T09:59:59.999999"), 
iceberg.GreaterThanEqual(hour, int32(463761))},
+               {iceberg.EqualTo(ref, "2022-11-27T10:00:00"), 
iceberg.EqualTo(hour, int32(463762))},
+               {iceberg.NotEqualTo(ref, "2022-11-27T10:00:00"), 
iceberg.AlwaysTrue{}},
+               {iceberg.IsIn(ref, "2022-11-27T10:00:00", 
"2022-11-27T09:59:59.999999"), iceberg.IsIn(hour, int32(463761), 463762)},
+               {iceberg.NotIn(ref, "2022-11-27T10:00:00", 
"2022-11-27T09:59:59.999999"), iceberg.AlwaysTrue{}},
+       }
+
+       project := newInclusiveProjection(schema, spec, true)
+       for _, tt := range tests {
+               p.Run(tt.pred.String(), func() {
+                       expr, err := project(tt.pred)
+                       p.Require().NoError(err)
+                       p.Truef(tt.expected.Equals(expr), "expected: %s\ngot: 
%s", tt.expected, expr)
+               })
+       }
+}
+
+func (p *ProjectionTestSuite) TestDayProjection() {
+       schema, spec := p.schema(), p.daySpec()
+
+       ref, date := iceberg.Reference("event_ts"), iceberg.Reference("date")
+       tests := []struct {
+               pred, expected iceberg.BooleanExpression
+       }{
+               {iceberg.NotNull(ref), iceberg.NotNull(date)},
+               {iceberg.IsNull(ref), iceberg.IsNull(date)},
+               {iceberg.LessThan(ref, "2022-11-27T00:00:00"), 
iceberg.LessThanEqual(date, int32(19322))},
+               {iceberg.LessThanEqual(ref, "2022-11-27T00:00:00"), 
iceberg.LessThanEqual(date, int32(19323))},
+               {iceberg.GreaterThan(ref, "2022-11-26T23:59:59.999999"), 
iceberg.GreaterThanEqual(date, int32(19323))},
+               {iceberg.GreaterThanEqual(ref, "2022-11-26T23:59:59.999999"), 
iceberg.GreaterThanEqual(date, int32(19322))},
+               {iceberg.EqualTo(ref, "2022-11-27T10:00:00"), 
iceberg.EqualTo(date, int32(19323))},
+               {iceberg.NotEqualTo(ref, "2022-11-27T10:00:00"), 
iceberg.AlwaysTrue{}},
+               {iceberg.IsIn(ref, "2022-11-27T00:00:00", 
"2022-11-26T23:59:59.999999"), iceberg.IsIn(date, int32(19322), 19323)},
+               {iceberg.NotIn(ref, "2022-11-27T00:00:00", 
"2022-11-26T23:59:59.999999"), iceberg.AlwaysTrue{}},
+       }
+
+       project := newInclusiveProjection(schema, spec, true)
+       for _, tt := range tests {
+               p.Run(tt.pred.String(), func() {
+                       expr, err := project(tt.pred)
+                       p.Require().NoError(err)
+                       p.Truef(tt.expected.Equals(expr), "expected: %s\ngot: 
%s", tt.expected, expr)
+               })
+       }
+}
+
+func (p *ProjectionTestSuite) TestDateDayProjection() {
+       schema, spec := p.schema(), p.daySpec()
+
+       ref, date := iceberg.Reference("event_date"), iceberg.Reference("ddate")
+       tests := []struct {
+               pred, expected iceberg.BooleanExpression
+       }{
+               {iceberg.NotNull(ref), iceberg.NotNull(date)},
+               {iceberg.IsNull(ref), iceberg.IsNull(date)},
+               {iceberg.LessThan(ref, "2022-11-27"), 
iceberg.LessThanEqual(date, int32(19322))},
+               {iceberg.LessThanEqual(ref, "2022-11-27"), 
iceberg.LessThanEqual(date, int32(19323))},
+               {iceberg.GreaterThan(ref, "2022-11-26"), 
iceberg.GreaterThanEqual(date, int32(19323))},
+               {iceberg.GreaterThanEqual(ref, "2022-11-26"), 
iceberg.GreaterThanEqual(date, int32(19322))},
+               {iceberg.EqualTo(ref, "2022-11-27"), iceberg.EqualTo(date, 
int32(19323))},
+               {iceberg.NotEqualTo(ref, "2022-11-27"), iceberg.AlwaysTrue{}},
+               {iceberg.IsIn(ref, "2022-11-27", "2022-11-26"), 
iceberg.IsIn(date, int32(19322), 19323)},
+               {iceberg.NotIn(ref, "2022-11-27", "2022-11-26"), 
iceberg.AlwaysTrue{}},
+       }
+
+       project := newInclusiveProjection(schema, spec, true)
+       for _, tt := range tests {
+               p.Run(tt.pred.String(), func() {
+                       expr, err := project(tt.pred)
+                       p.Require().NoError(err)
+                       p.Truef(tt.expected.Equals(expr), "expected: %s\ngot: 
%s", tt.expected, expr)
+               })
+       }
+}
+
+func (p *ProjectionTestSuite) TestStringTruncateProjection() {
+       schema, spec := p.schema(), p.truncateStrSpec()
+
+       ref, truncStr := iceberg.Reference("data"), 
iceberg.Reference("data_trunc")
+       tests := []struct {
+               pred, expected iceberg.BooleanExpression
+       }{
+               {iceberg.NotNull(ref), iceberg.NotNull(truncStr)},
+               {iceberg.IsNull(ref), iceberg.IsNull(truncStr)},
+               {iceberg.LessThan(ref, "aaa"), iceberg.LessThanEqual(truncStr, 
"aa")},
+               {iceberg.LessThanEqual(ref, "aaa"), 
iceberg.LessThanEqual(truncStr, "aa")},
+               {iceberg.GreaterThan(ref, "aaa"), 
iceberg.GreaterThanEqual(truncStr, "aa")},
+               {iceberg.GreaterThanEqual(ref, "aaa"), 
iceberg.GreaterThanEqual(truncStr, "aa")},
+               {iceberg.EqualTo(ref, "aaa"), iceberg.EqualTo(truncStr, "aa")},
+               {iceberg.NotEqualTo(ref, "aaa"), iceberg.AlwaysTrue{}},
+               {iceberg.IsIn(ref, "aaa", "aab"), iceberg.EqualTo(truncStr, 
"aa")},
+               {iceberg.NotIn(ref, "aaa", "aab"), iceberg.AlwaysTrue{}},
+       }
+
+       project := newInclusiveProjection(schema, spec, true)
+       for _, tt := range tests {
+               p.Run(tt.pred.String(), func() {
+                       expr, err := project(tt.pred)
+                       p.Require().NoError(err)
+                       p.Truef(tt.expected.Equals(expr), "expected: %s\ngot: 
%s", tt.expected, expr)
+               })
+       }
+}
+
+func (p *ProjectionTestSuite) TestIntTruncateProjection() {
+       schema, spec := p.schema(), p.truncateIntSpec()
+
+       ref, idTrunc := iceberg.Reference("id"), iceberg.Reference("id_trunc")
+       tests := []struct {
+               pred, expected iceberg.BooleanExpression
+       }{
+               {iceberg.NotNull(ref), iceberg.NotNull(idTrunc)},
+               {iceberg.IsNull(ref), iceberg.IsNull(idTrunc)},
+               {iceberg.LessThan(ref, int32(10)), 
iceberg.LessThanEqual(idTrunc, int64(0))},
+               {iceberg.LessThanEqual(ref, int32(10)), 
iceberg.LessThanEqual(idTrunc, int64(10))},
+               {iceberg.GreaterThan(ref, int32(9)), 
iceberg.GreaterThanEqual(idTrunc, int64(10))},
+               {iceberg.GreaterThanEqual(ref, int32(10)), 
iceberg.GreaterThanEqual(idTrunc, int64(10))},
+               {iceberg.EqualTo(ref, int32(15)), iceberg.EqualTo(idTrunc, 
int64(10))},
+               {iceberg.NotEqualTo(ref, int32(15)), iceberg.AlwaysTrue{}},
+               {iceberg.IsIn(ref, int32(15), 16), iceberg.EqualTo(idTrunc, 
int64(10))},
+               {iceberg.NotIn(ref, int32(15), 16), iceberg.AlwaysTrue{}},
+       }
+
+       project := newInclusiveProjection(schema, spec, true)
+       for _, tt := range tests {
+               p.Run(tt.pred.String(), func() {
+                       expr, err := project(tt.pred)
+                       p.Require().NoError(err)
+                       p.Truef(tt.expected.Equals(expr), "expected: %s\ngot: 
%s", tt.expected, expr)
+               })
+       }
+}
+
+func (p *ProjectionTestSuite) TestProjectionCaseSensitive() {
+       schema, spec := p.schema(), p.idSpec()
+       project := newInclusiveProjection(schema, spec, true)
+       _, err := project(iceberg.NotNull(iceberg.Reference("ID")))
+       p.ErrorIs(err, iceberg.ErrInvalidSchema)
+       p.ErrorContains(err, "could not bind reference 'ID', 
caseSensitive=true")
+}
+
+func (p *ProjectionTestSuite) TestProjectionCaseInsensitive() {
+       schema, spec := p.schema(), p.idSpec()
+       project := newInclusiveProjection(schema, spec, false)
+       expr, err := project(iceberg.NotNull(iceberg.Reference("ID")))
+       p.Require().NoError(err)
+       p.True(expr.Equals(iceberg.NotNull(iceberg.Reference("id_part"))))
+}
+
+func (p *ProjectionTestSuite) TestProjectEmptySpec() {
+       project := newInclusiveProjection(p.schema(), p.emptySpec(), true)
+       expr, err := 
project(iceberg.NewAnd(iceberg.LessThan(iceberg.Reference("id"), int32(5)),
+               iceberg.NotNull(iceberg.Reference("data"))))
+       p.Require().NoError(err)
+       p.Equal(iceberg.AlwaysTrue{}, expr)
+}
+
+func (p *ProjectionTestSuite) TestAndProjectionMultipleFields() {
+       project := newInclusiveProjection(p.schema(), p.idAndBucketSpec(), true)
+       expr, err := 
project(iceberg.NewAnd(iceberg.LessThan(iceberg.Reference("id"),
+               int32(5)), iceberg.IsIn(iceberg.Reference("data"), "a", "b", 
"c")))
+       p.Require().NoError(err)
+
+       
p.True(expr.Equals(iceberg.NewAnd(iceberg.LessThan(iceberg.Reference("id_part"),
 int64(5)),
+               iceberg.IsIn(iceberg.Reference("data_bucket"), int32(2), 3, 
15))))
+}
+
+func (p *ProjectionTestSuite) TestOrProjectionMultipleFields() {
+       project := newInclusiveProjection(p.schema(), p.idAndBucketSpec(), true)
+       expr, err := 
project(iceberg.NewOr(iceberg.LessThan(iceberg.Reference("id"), int32(5)),
+               iceberg.IsIn(iceberg.Reference("data"), "a", "b", "c")))
+       p.Require().NoError(err)
+
+       
p.True(expr.Equals(iceberg.NewOr(iceberg.LessThan(iceberg.Reference("id_part"), 
int64(5)),
+               iceberg.IsIn(iceberg.Reference("data_bucket"), int32(2), 3, 
15))))
+}
+
+func (p *ProjectionTestSuite) TestNotProjectionMultipleFields() {
+       project := newInclusiveProjection(p.schema(), p.idAndBucketSpec(), true)
+       // not causes In to be rewritten to NotIn, which cannot be projected
+       expr, err := 
project(iceberg.NewNot(iceberg.NewOr(iceberg.LessThan(iceberg.Reference("id"), 
int64(5)),
+               iceberg.IsIn(iceberg.Reference("data"), "a", "b", "c"))))
+       p.Require().NoError(err)
+
+       
p.True(expr.Equals(iceberg.GreaterThanEqual(iceberg.Reference("id_part"), 
int64(5))))
+}
+
+func (p *ProjectionTestSuite) TestPartialProjectedFields() {
+       project := newInclusiveProjection(p.schema(), p.idSpec(), true)
+       expr, err := 
project(iceberg.NewAnd(iceberg.LessThan(iceberg.Reference("id"), int32(5)),
+               iceberg.IsIn(iceberg.Reference("data"), "a", "b", "c")))
+       p.Require().NoError(err)
+       p.True(expr.Equals(iceberg.LessThan(iceberg.Reference("id_part"), 
int64(5))))
+}
+
+type mockDataFile struct {
+       path        string
+       format      iceberg.FileFormat
+       partition   map[string]any
+       count       int64
+       columnSizes map[int]int64
+       filesize    int64
+       valueCounts map[int]int64
+       nullCounts  map[int]int64
+       nanCounts   map[int]int64
+       lowerBounds map[int][]byte
+       upperBounds map[int][]byte
+}
+
+func (*mockDataFile) ContentType() iceberg.ManifestEntryContent { return 
iceberg.EntryContentData }
+func (m *mockDataFile) FilePath() string                        { return 
m.path }
+func (m *mockDataFile) FileFormat() iceberg.FileFormat          { return 
m.format }
+func (m *mockDataFile) Partition() map[string]any               { return 
m.partition }
+func (m *mockDataFile) Count() int64                            { return 
m.count }
+func (m *mockDataFile) FileSizeBytes() int64                    { return 
m.filesize }
+func (m *mockDataFile) ColumnSizes() map[int]int64              { return 
m.columnSizes }
+func (m *mockDataFile) ValueCounts() map[int]int64              { return 
m.valueCounts }
+func (m *mockDataFile) NullValueCounts() map[int]int64          { return 
m.nullCounts }
+func (m *mockDataFile) NaNValueCounts() map[int]int64           { return 
m.nanCounts }
+func (*mockDataFile) DistinctValueCounts() map[int]int64        { return nil }
+func (m *mockDataFile) LowerBoundValues() map[int][]byte        { return 
m.lowerBounds }
+func (m *mockDataFile) UpperBoundValues() map[int][]byte        { return 
m.upperBounds }
+func (*mockDataFile) KeyMetadata() []byte                       { return nil }
+func (*mockDataFile) SplitOffsets() []int64                     { return nil }
+func (*mockDataFile) EqualityFieldIDs() []int                   { return nil }
+func (*mockDataFile) SortOrderID() *int                         { return nil }
+
+type InclusiveMetricsTestSuite struct {
+       suite.Suite
+
+       schemaDataFile *iceberg.Schema
+       dataFiles      [4]iceberg.DataFile
+
+       schemaDataFileNan *iceberg.Schema
+       dataFileNan       iceberg.DataFile
+}
+
+func (suite *InclusiveMetricsTestSuite) SetupSuite() {
+       suite.schemaDataFile = iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+               iceberg.NestedField{ID: 2, Name: "no_stats", Type: 
iceberg.PrimitiveTypes.Int32, Required: false},
+               iceberg.NestedField{ID: 3, Name: "required", Type: 
iceberg.PrimitiveTypes.String, Required: true},
+               iceberg.NestedField{ID: 4, Name: "all_nulls", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 5, Name: "some_nulls", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 6, Name: "no_nulls", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 7, Name: "all_nans", Type: 
iceberg.PrimitiveTypes.Float64},
+               iceberg.NestedField{ID: 8, Name: "some_nans", Type: 
iceberg.PrimitiveTypes.Float32},
+               iceberg.NestedField{ID: 9, Name: "no_nans", Type: 
iceberg.PrimitiveTypes.Float32},
+               iceberg.NestedField{ID: 10, Name: "all_nulls_double", Type: 
iceberg.PrimitiveTypes.Float64},
+               iceberg.NestedField{ID: 11, Name: "all_nans_v1_stats", Type: 
iceberg.PrimitiveTypes.Float32},
+               iceberg.NestedField{ID: 12, Name: "nan_and_null_only", Type: 
iceberg.PrimitiveTypes.Float64},
+               iceberg.NestedField{ID: 13, Name: "no_nan_stats", Type: 
iceberg.PrimitiveTypes.Float64},
+               iceberg.NestedField{ID: 14, Name: "some_empty", Type: 
iceberg.PrimitiveTypes.String},
+       )
+
+       var (
+               IntMin, _   = iceberg.Int32Literal(IntMinValue).MarshalBinary()
+               IntMax, _   = iceberg.Int32Literal(IntMaxValue).MarshalBinary()
+               FltNan, _   = 
iceberg.Float32Literal(float32(math.NaN())).MarshalBinary()
+               DblNan, _   = iceberg.Float64Literal(math.NaN()).MarshalBinary()
+               FltSeven, _ = iceberg.Float32Literal(7).MarshalBinary()
+               DblSeven, _ = iceberg.Float64Literal(7).MarshalBinary()
+               FltMax, _   = iceberg.Float32Literal(22).MarshalBinary()
+       )
+
+       suite.dataFiles = [4]iceberg.DataFile{
+               &mockDataFile{
+                       path:     "file_1.parquet",
+                       format:   iceberg.ParquetFile,
+                       count:    50,
+                       filesize: 3,
+                       valueCounts: map[int]int64{
+                               4: 50, 5: 50, 6: 50, 7: 50, 8: 50, 9: 50,
+                               10: 50, 11: 50, 12: 50, 13: 50, 14: 50,
+                       },
+                       nullCounts: map[int]int64{4: 50, 5: 10, 6: 0, 10: 50, 
11: 0, 12: 1, 14: 8},
+                       nanCounts:  map[int]int64{7: 50, 8: 10, 9: 0},
+                       lowerBounds: map[int][]byte{
+                               1:  IntMin,
+                               11: FltNan,
+                               12: DblNan,
+                               14: {},
+                       },
+                       upperBounds: map[int][]byte{
+                               1:  IntMax,
+                               11: FltNan,
+                               12: DblNan,
+                               14: []byte("房东整租霍营小区二层两居室"),
+                       },
+               },
+               &mockDataFile{
+                       path:        "file_2.parquet",
+                       format:      iceberg.ParquetFile,
+                       count:       50,
+                       filesize:    3,
+                       valueCounts: map[int]int64{3: 20},
+                       nullCounts:  map[int]int64{3: 2},
+                       nanCounts:   nil,
+                       lowerBounds: map[int][]byte{3: {'a', 'a'}},
+                       upperBounds: map[int][]byte{3: {'d', 'C'}},
+               },
+               &mockDataFile{
+                       path:        "file_3.parquet",
+                       format:      iceberg.ParquetFile,
+                       count:       50,
+                       filesize:    3,
+                       valueCounts: map[int]int64{3: 20},
+                       nullCounts:  map[int]int64{3: 2},
+                       nanCounts:   nil,
+                       lowerBounds: map[int][]byte{3: []byte("1str1")},
+                       upperBounds: map[int][]byte{3: []byte("3str3")},
+               },
+               &mockDataFile{
+                       path:        "file_4.parquet",
+                       format:      iceberg.ParquetFile,
+                       count:       50,
+                       filesize:    3,
+                       valueCounts: map[int]int64{3: 20},
+                       nullCounts:  map[int]int64{3: 2},
+                       nanCounts:   nil,
+                       lowerBounds: map[int][]byte{3: []byte("abc")},
+                       upperBounds: map[int][]byte{3: []byte("イロハニホヘト")},
+               },
+       }
+
+       suite.schemaDataFileNan = iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "all_nan", Type: 
iceberg.PrimitiveTypes.Float64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "max_nan", Type: 
iceberg.PrimitiveTypes.Float64, Required: true},
+               iceberg.NestedField{ID: 3, Name: "min_max_nan", Type: 
iceberg.PrimitiveTypes.Float32},
+               iceberg.NestedField{ID: 4, Name: "all_nan_null_bounds", Type: 
iceberg.PrimitiveTypes.Float64, Required: true},
+               iceberg.NestedField{ID: 5, Name: "some_nan_correct_bounds", 
Type: iceberg.PrimitiveTypes.Float32},
+       )
+
+       suite.dataFileNan = &mockDataFile{
+               path:        "file.avro",
+               format:      iceberg.AvroFile,
+               count:       50,
+               filesize:    3,
+               columnSizes: map[int]int64{1: 10, 2: 10, 3: 10, 4: 10, 5: 10},
+               valueCounts: map[int]int64{1: 10, 2: 10, 3: 10, 4: 10, 5: 10},
+               nullCounts:  map[int]int64{1: 0, 2: 0, 3: 0, 4: 0, 5: 0},
+               nanCounts:   map[int]int64{1: 10, 4: 10, 5: 5},
+               lowerBounds: map[int][]byte{
+                       1: DblNan,
+                       2: DblSeven,
+                       3: FltNan,
+                       5: FltSeven,
+               },
+               upperBounds: map[int][]byte{
+                       1: DblNan,
+                       2: DblNan,
+                       3: FltNan,
+                       5: FltMax,
+               },
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestAllNull() {
+       allNull, someNull, noNull := iceberg.Reference("all_nulls"), 
iceberg.Reference("some_nulls"), iceberg.Reference("no_nulls")
+
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.NotNull(allNull), false, "should skip: no non-null 
value in all null column"},
+               {iceberg.LessThan(allNull, "a"), false, "should skip: lessThan 
on all null column"},
+               {iceberg.LessThanEqual(allNull, "a"), false, "should skip: 
lessThanEqual on all null column"},
+               {iceberg.GreaterThan(allNull, "a"), false, "should skip: 
greaterThan on all null column"},
+               {iceberg.GreaterThanEqual(allNull, "a"), false, "should skip: 
greaterThanEqual on all null column"},
+               {iceberg.EqualTo(allNull, "a"), false, "should skip: equal on 
all null column"},
+               {iceberg.NotNull(someNull), true, "should read: column with 
some nulls contains a non-null value"},
+               {iceberg.NotNull(noNull), true, "should read: non-null column 
contains a non-null value"},
+               {iceberg.StartsWith(allNull, "asad"), false, "should skip: 
starts with on all null column"},
+               {iceberg.NotStartsWith(allNull, "asad"), true, "should read: 
notStartsWith on all null column"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestNoNulls() {
+       allNull, someNull, noNull := iceberg.Reference("all_nulls"), 
iceberg.Reference("some_nulls"), iceberg.Reference("no_nulls")
+
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.IsNull(allNull), true, "should read: at least one null 
value in all null column"},
+               {iceberg.IsNull(someNull), true, "should read: column with some 
nulls contains a null value"},
+               {iceberg.IsNull(noNull), false, "should skip: non-null column 
contains no null values"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIsNan() {
+       allNan, someNan, noNan := iceberg.Reference("all_nans"), 
iceberg.Reference("some_nans"), iceberg.Reference("no_nans")
+       allNullsDbl, noNanStats := iceberg.Reference("all_nulls_double"), 
iceberg.Reference("no_nan_stats")
+       allNansV1, nanNullOnly := iceberg.Reference("all_nans_v1_stats"), 
iceberg.Reference("nan_and_null_only")
+
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.IsNaN(allNan), true, "should read: at least one nan 
value in all nan column"},
+               {iceberg.IsNaN(someNan), true, "should read: at least one nan 
value in some nan column"},
+               {iceberg.IsNaN(noNan), false, "should skip: no-nans column has 
no nans"},
+               {iceberg.IsNaN(allNullsDbl), false, "should skip: all-null 
column doesn't contain nan values"},
+               {iceberg.IsNaN(noNanStats), true, "should read: no guarantee if 
contains nan without stats"},
+               {iceberg.IsNaN(allNansV1), true, "should read: at least one nan 
value in all nan column"},
+               {iceberg.IsNaN(nanNullOnly), true, "should read: at least one 
nan value in nan and nulls only column"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestNotNaN() {
+       allNan, someNan, noNan := iceberg.Reference("all_nans"), 
iceberg.Reference("some_nans"), iceberg.Reference("no_nans")
+       allNullsDbl, noNanStats := iceberg.Reference("all_nulls_double"), 
iceberg.Reference("no_nan_stats")
+       allNansV1, nanNullOnly := iceberg.Reference("all_nans_v1_stats"), 
iceberg.Reference("nan_and_null_only")
+
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.NotNaN(allNan), false, "should skip: column with all 
nans will not contain non-nan"},
+               {iceberg.NotNaN(someNan), true, "should read: at least one 
non-nan value in some nan column"},
+               {iceberg.NotNaN(noNan), true, "should read: at least one 
non-nan value in no nan column"},
+               {iceberg.NotNaN(allNullsDbl), true, "should read: at least one 
non-nan value in all null column"},
+               {iceberg.NotNaN(noNanStats), true, "should read: no guarantee 
if contains nan without stats"},
+               {iceberg.NotNaN(allNansV1), true, "should read: no guarantee"},
+               {iceberg.NotNaN(nanNullOnly), true, "should read: at least one 
null value in nan and nulls only column"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestRequiredColumn() {
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.NotNull(iceberg.Reference("required")), true, "should 
read: required columns are always non-null"},
+               {iceberg.IsNull(iceberg.Reference("required")), false, "should 
skip: required columns are always non-null"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestMissingColumn() {
+       _, err := newInclusiveMetricsEvaluator(suite.schemaDataFile, 
iceberg.LessThan(iceberg.Reference("missing"), int32(22)), true, true)
+       suite.ErrorIs(err, iceberg.ErrInvalidSchema)
+}
+
+func (suite *InclusiveMetricsTestSuite) TestMissingStats() {
+       noStatsSchema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 2, Name: "no_stats", Type: 
iceberg.PrimitiveTypes.Float64})
+
+       noStatsFile := &mockDataFile{
+               path:   "file_1.parquet",
+               format: iceberg.ParquetFile,
+               count:  50,
+       }
+
+       ref := iceberg.Reference("no_stats")
+       tests := []iceberg.BooleanExpression{
+               iceberg.LessThan(ref, int32(5)),
+               iceberg.LessThanEqual(ref, int32(30)),
+               iceberg.EqualTo(ref, int32(70)),
+               iceberg.GreaterThan(ref, int32(78)),
+               iceberg.GreaterThanEqual(ref, int32(90)),
+               iceberg.NotEqualTo(ref, int32(101)),
+               iceberg.IsNull(ref),
+               iceberg.NotNull(ref),
+               iceberg.IsNaN(ref),
+               iceberg.NotNaN(ref),
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(noStatsSchema, tt, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(noStatsFile)
+                       suite.Require().NoError(err)
+                       suite.True(shouldRead, "should read when stats are 
missing")
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestZeroRecordFileStats() {
+       zeroRecordFile := &mockDataFile{
+               path:   "file_1.parquet",
+               format: iceberg.ParquetFile,
+               count:  0,
+       }
+
+       ref := iceberg.Reference("no_stats")
+       tests := []iceberg.BooleanExpression{
+               iceberg.LessThan(ref, int32(5)),
+               iceberg.LessThanEqual(ref, int32(30)),
+               iceberg.EqualTo(ref, int32(70)),
+               iceberg.GreaterThan(ref, int32(78)),
+               iceberg.GreaterThanEqual(ref, int32(90)),
+               iceberg.NotEqualTo(ref, int32(101)),
+               iceberg.IsNull(ref),
+               iceberg.NotNull(ref),
+               iceberg.IsNaN(ref),
+               iceberg.NotNaN(ref),
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt, true, false)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(zeroRecordFile)
+                       suite.Require().NoError(err)
+                       suite.False(shouldRead, "should skip datafile without 
records")
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestNot() {
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.NewNot(iceberg.LessThan(iceberg.Reference("id"), 
IntMinValue-25)), true, "should read: not(false)"},
+               {iceberg.NewNot(iceberg.GreaterThan(iceberg.Reference("id"), 
IntMinValue-25)), false, "should skip: not(true)"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestAnd() {
+       ref := iceberg.Reference("id")
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.NewAnd(
+                       iceberg.LessThan(ref, IntMinValue-25),
+                       iceberg.GreaterThanEqual(ref, IntMinValue-30)), false, 
"should skip: and(false, true)"},
+               {iceberg.NewAnd(
+                       iceberg.LessThan(ref, IntMinValue-25),
+                       iceberg.GreaterThanEqual(ref, IntMinValue+1)), false, 
"should skip: and(false, false)"},
+               {iceberg.NewAnd(
+                       iceberg.GreaterThan(ref, IntMinValue-25),
+                       iceberg.LessThanEqual(ref, IntMinValue)), true, "should 
read: and(true, true)"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestOr() {
+       ref := iceberg.Reference("id")
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.NewOr(
+                       iceberg.LessThan(ref, IntMinValue-25),
+                       iceberg.GreaterThanEqual(ref, IntMaxValue+1)), false, 
"should skip: or(false, false)"},
+               {iceberg.NewOr(
+                       iceberg.LessThan(ref, IntMinValue-25),
+                       iceberg.GreaterThanEqual(ref, IntMaxValue-19)), true, 
"should read: or(false, true)"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIntLt() {
+       ref := iceberg.Reference("id")
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.LessThan(ref, IntMinValue-25), false, "should skip: id 
range below lower bound (5 < 30)"},
+               {iceberg.LessThan(ref, IntMinValue), false, "should skip: id 
range below lower bound (30 is not < 30)"},
+               {iceberg.LessThan(ref, IntMinValue+1), true, "should read: one 
possible id"},
+               {iceberg.LessThan(ref, IntMaxValue), true, "should read: many 
possible ids"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIntLtEq() {
+       ref := iceberg.Reference("id")
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.LessThanEqual(ref, IntMinValue-25), false, "should 
skip: id range below lower bound (5 < 30)"},
+               {iceberg.LessThanEqual(ref, IntMinValue-1), false, "should 
skip: id range below lower bound (29 is not <= 30)"},
+               {iceberg.LessThanEqual(ref, IntMinValue), true, "should read: 
one possible id"},
+               {iceberg.LessThanEqual(ref, IntMaxValue), true, "should read: 
many possible ids"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIntGt() {
+       ref := iceberg.Reference("id")
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.GreaterThan(ref, IntMaxValue+6), false, "should skip: 
id range above upper bound (85 > 79)"},
+               {iceberg.GreaterThan(ref, IntMaxValue), false, "should skip: id 
range above upper bound (79 is not > 79)"},
+               {iceberg.GreaterThan(ref, IntMinValue-1), true, "should read: 
one possible id"},
+               {iceberg.GreaterThan(ref, IntMaxValue-4), true, "should read: 
many possible ids"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIntGtEq() {
+       ref := iceberg.Reference("id")
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.GreaterThanEqual(ref, IntMaxValue+6), false, "should 
skip: id range above upper bound (85 < 79)"},
+               {iceberg.GreaterThanEqual(ref, IntMaxValue+1), false, "should 
skip: id range above upper bound (80 > 79)"},
+               {iceberg.GreaterThanEqual(ref, IntMaxValue), true, "should 
read: one possible id"},
+               {iceberg.GreaterThanEqual(ref, IntMaxValue-4), true, "should 
read: many possible ids"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIntEq() {
+       ref := iceberg.Reference("id")
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.EqualTo(ref, IntMinValue-25), false, "should skip: id 
range below lower bound"},
+               {iceberg.EqualTo(ref, IntMinValue-1), false, "should skip: id 
range below lower bound"},
+               {iceberg.EqualTo(ref, IntMinValue), true, "should read: id 
equal to lower bound"},
+               {iceberg.EqualTo(ref, IntMaxValue-4), true, "should read: id 
between lower and upper bounds"},
+               {iceberg.EqualTo(ref, IntMaxValue), true, "should read: id 
equal to upper bound"},
+               {iceberg.EqualTo(ref, IntMaxValue+1), false, "should skip: id 
above upper bound"},
+               {iceberg.EqualTo(ref, IntMaxValue+6), false, "should skip: id 
above upper bound"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIntNeq() {
+       ref := iceberg.Reference("id")
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.NotEqualTo(ref, IntMinValue-25), true, "should read: 
id range below lower bound"},
+               {iceberg.NotEqualTo(ref, IntMinValue-1), true, "should read: id 
range below lower bound"},
+               {iceberg.NotEqualTo(ref, IntMinValue), true, "should read: id 
equal to lower bound"},
+               {iceberg.NotEqualTo(ref, IntMaxValue-4), true, "should read: id 
between lower and upper bounds"},
+               {iceberg.NotEqualTo(ref, IntMaxValue), true, "should read: id 
equal to upper bound"},
+               {iceberg.NotEqualTo(ref, IntMaxValue+1), true, "should read: id 
above upper bound"},
+               {iceberg.NotEqualTo(ref, IntMaxValue+6), true, "should read: id 
above upper bound"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIntNeqRewritten() {
+       ref := iceberg.Reference("id")
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.EqualTo(ref, IntMinValue-25), true, "should read: id 
range below lower bound"},
+               {iceberg.EqualTo(ref, IntMinValue-1), true, "should read: id 
range below lower bound"},
+               {iceberg.EqualTo(ref, IntMinValue), true, "should read: id 
equal to lower bound"},
+               {iceberg.EqualTo(ref, IntMaxValue-4), true, "should read: id 
between lower and upper bounds"},
+               {iceberg.EqualTo(ref, IntMaxValue), true, "should read: id 
equal to upper bound"},
+               {iceberg.EqualTo(ref, IntMaxValue+1), true, "should read: id 
above upper bound"},
+               {iceberg.EqualTo(ref, IntMaxValue+6), true, "should read: id 
above upper bound"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, iceberg.NewNot(tt.expr), 
true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestIntNeqRewrittenCaseInsensitive() {
+       ref := iceberg.Reference("ID")
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.EqualTo(ref, IntMinValue-25), true, "should read: id 
range below lower bound"},
+               {iceberg.EqualTo(ref, IntMinValue-1), true, "should read: id 
range below lower bound"},
+               {iceberg.EqualTo(ref, IntMinValue), true, "should read: id 
equal to lower bound"},
+               {iceberg.EqualTo(ref, IntMaxValue-4), true, "should read: id 
between lower and upper bounds"},
+               {iceberg.EqualTo(ref, IntMaxValue), true, "should read: id 
equal to upper bound"},
+               {iceberg.EqualTo(ref, IntMaxValue+1), true, "should read: id 
above upper bound"},
+               {iceberg.EqualTo(ref, IntMaxValue+6), true, "should read: id 
above upper bound"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, iceberg.NewNot(tt.expr), 
false, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestInMetrics() {
+       ref := iceberg.Reference("id")
+
+       ids := make([]int32, 400)
+       for i := range ids {
+               ids[i] = int32(i)
+       }
+
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.IsIn(ref, IntMinValue-25, IntMinValue-24), false, 
"should skip: id below lower bound"},
+               {iceberg.IsIn(ref, IntMinValue-2, IntMinValue-1), false, 
"should skip: id below lower bound"},
+               {iceberg.IsIn(ref, IntMinValue-1, IntMinValue), true, "should 
read: id equal to lower bound"},
+               {iceberg.IsIn(ref, IntMaxValue-4, IntMaxValue-3), true, "should 
read: id between upper and lower bounds"},
+               {iceberg.IsIn(ref, IntMaxValue, IntMaxValue+1), true, "should 
read: id equal to upper bound"},
+               {iceberg.IsIn(ref, IntMaxValue+1, IntMaxValue+2), false, 
"should skip: id above upper bound"},
+               {iceberg.IsIn(ref, IntMaxValue+6, IntMaxValue+7), false, 
"should skip: id above upper bound"},
+               {iceberg.IsIn(iceberg.Reference("all_nulls"), "abc", "def"), 
false, "should skip: in on all nulls column"},
+               {iceberg.IsIn(iceberg.Reference("some_nulls"), "abc", "def"), 
true, "should read: in on some nulls column"},
+               {iceberg.IsIn(iceberg.Reference("no_nulls"), "abc", "def"), 
true, "should read: in on no nulls column"},
+               {iceberg.IsIn(ref, ids...), true, "should read: large in 
expression"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestNotInMetrics() {
+       ref := iceberg.Reference("id")
+
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.NotIn(ref, IntMinValue-25, IntMinValue-24), true, 
"should read: id below lower bound"},
+               {iceberg.NotIn(ref, IntMinValue-2, IntMinValue-1), true, 
"should read: id below lower bound"},
+               {iceberg.NotIn(ref, IntMinValue-1, IntMinValue), true, "should 
read: id equal to lower bound"},
+               {iceberg.NotIn(ref, IntMaxValue-4, IntMaxValue-3), true, 
"should read: id between upper and lower bounds"},
+               {iceberg.NotIn(ref, IntMaxValue, IntMaxValue+1), true, "should 
read: id equal to upper bound"},
+               {iceberg.NotIn(ref, IntMaxValue+1, IntMaxValue+2), true, 
"should read: id above upper bound"},
+               {iceberg.NotIn(ref, IntMaxValue+6, IntMaxValue+7), true, 
"should read: id above upper bound"},
+               {iceberg.NotIn(iceberg.Reference("all_nulls"), "abc", "def"), 
true, "should read: in on all nulls column"},
+               {iceberg.NotIn(iceberg.Reference("some_nulls"), "abc", "def"), 
true, "should read: in on some nulls column"},
+               {iceberg.NotIn(iceberg.Reference("no_nulls"), "abc", "def"), 
true, "should read: in on no nulls column"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFiles[0])
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestLessAndLessEqualNans() {
+       type Op func(iceberg.UnboundTerm, int32) iceberg.UnboundPredicate
+       for _, operator := range []Op{iceberg.LessThan[int32], 
iceberg.LessThanEqual[int32]} {
+               tests := []struct {
+                       expr     iceberg.BooleanExpression
+                       expected bool
+                       msg      string
+               }{
+                       {operator(iceberg.Reference("all_nan"), int32(1)), 
false, "should skip: all nan column doesn't contain number"},
+                       {operator(iceberg.Reference("max_nan"), int32(1)), 
false, "should skip: 1 is smaller than lower bound"},
+                       {operator(iceberg.Reference("max_nan"), int32(10)), 
true, "should read: 10 is larger than lower bound"},
+                       {operator(iceberg.Reference("min_max_nan"), int32(1)), 
true, "should read: no visibility"},
+                       {operator(iceberg.Reference("all_nan_null_bounds"), 
int32(1)), false, "should skip: all nan column doesn't contain number"},
+                       {operator(iceberg.Reference("some_nan_correct_bounds"), 
int32(1)), false, "should skip: 1 is smaller than lower bound"},
+                       {operator(iceberg.Reference("some_nan_correct_bounds"), 
int32(10)), true, "should read: 10 is larger than lower bound"},
+               }
+
+               for _, tt := range tests {
+                       suite.Run(tt.expr.String(), func() {
+                               eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFileNan, tt.expr, true, true)
+                               suite.Require().NoError(err)
+                               shouldRead, err := eval(suite.dataFileNan)
+                               suite.Require().NoError(err)
+                               suite.Equal(tt.expected, shouldRead, tt.msg)
+                       })
+               }
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestGreaterAndGreaterEqualNans() {
+       type Op func(iceberg.UnboundTerm, int32) iceberg.UnboundPredicate
+       for _, operator := range []Op{iceberg.GreaterThan[int32], 
iceberg.GreaterThanEqual[int32]} {
+               tests := []struct {
+                       expr     iceberg.BooleanExpression
+                       expected bool
+                       msg      string
+               }{
+                       {operator(iceberg.Reference("all_nan"), int32(1)), 
false, "should skip: all nan column doesn't contain number"},
+                       {operator(iceberg.Reference("max_nan"), int32(1)), 
true, "should read: upper bound is larger than 1"},
+                       {operator(iceberg.Reference("max_nan"), int32(10)), 
true, "should read: 10 is smaller than upper bound"},
+                       {operator(iceberg.Reference("min_max_nan"), int32(1)), 
true, "should read: no visibility"},
+                       {operator(iceberg.Reference("all_nan_null_bounds"), 
int32(1)), false, "should skip: all nan column doesn't contain number"},
+                       {operator(iceberg.Reference("some_nan_correct_bounds"), 
int32(1)), true, "should read: 1 is smaller than upper bound"},
+                       {operator(iceberg.Reference("some_nan_correct_bounds"), 
int32(10)), true, "should read: 10 is smaller than upper bound"},
+                       {operator(iceberg.Reference("all_nan"), int32(30)), 
false, "should skip: 30 is larger than upper bound"},
+               }
+
+               for _, tt := range tests {
+                       suite.Run(tt.expr.String(), func() {
+                               eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFileNan, tt.expr, true, true)
+                               suite.Require().NoError(err)
+                               shouldRead, err := eval(suite.dataFileNan)
+                               suite.Require().NoError(err)
+                               suite.Equal(tt.expected, shouldRead, tt.msg)
+                       })
+               }
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestEqualsNans() {
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.EqualTo(iceberg.Reference("all_nan"), int32(1)), 
false, "should skip: all nan column doesn't contain number"},
+               {iceberg.EqualTo(iceberg.Reference("max_nan"), int32(1)), 
false, "should skip: 1 is smaller than lower bound"},
+               {iceberg.EqualTo(iceberg.Reference("max_nan"), int32(10)), 
true, "should read: 10 is within bounds"},
+               {iceberg.EqualTo(iceberg.Reference("min_max_nan"), int32(1)), 
true, "should read: no visibility"},
+               {iceberg.EqualTo(iceberg.Reference("all_nan_null_bounds"), 
int32(1)), false, "should skip: all nan column doesn't contain number"},
+               {iceberg.EqualTo(iceberg.Reference("some_nan_correct_bounds"), 
int32(1)), false, "should skip: 1 is smaller than lower bound"},
+               {iceberg.EqualTo(iceberg.Reference("some_nan_correct_bounds"), 
int32(10)), true, "should read: 10 within bounds"},
+               {iceberg.EqualTo(iceberg.Reference("all_nan"), int32(30)), 
false, "should skip: 30 is larger than upper bound"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFileNan, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFileNan)
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestNotEqualsNans() {
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.NotEqualTo(iceberg.Reference("all_nan"), int32(1)), 
true, "should read: no visibility"},
+               {iceberg.NotEqualTo(iceberg.Reference("max_nan"), int32(1)), 
true, "should read: no visibility"},
+               {iceberg.NotEqualTo(iceberg.Reference("max_nan"), int32(10)), 
true, "should read: no visibility"},
+               {iceberg.NotEqualTo(iceberg.Reference("min_max_nan"), 
int32(1)), true, "should read: no visibility"},
+               {iceberg.NotEqualTo(iceberg.Reference("all_nan_null_bounds"), 
int32(1)), true, "should read: no visibility"},
+               
{iceberg.NotEqualTo(iceberg.Reference("some_nan_correct_bounds"), int32(1)), 
true, "should read: no visibility"},
+               
{iceberg.NotEqualTo(iceberg.Reference("some_nan_correct_bounds"), int32(10)), 
true, "should read: no visibility"},
+               {iceberg.NotEqualTo(iceberg.Reference("all_nan"), int32(30)), 
true, "should read: no visibility"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFileNan, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFileNan)
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestInWithNans() {
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.IsIn(iceberg.Reference("all_nan"), int32(1), 10, 30), 
false, "should skip: all nan column doesn't contain number"},
+               {iceberg.IsIn(iceberg.Reference("max_nan"), int32(1), 10, 30), 
true, "should read: 10 and 30 are greater than lower bound"},
+               {iceberg.IsIn(iceberg.Reference("min_max_nan"), int32(1), 10, 
30), true, "should read: no visibility"},
+               {iceberg.IsIn(iceberg.Reference("all_nan_null_bounds"), 
int32(1), 10, 30), false, "should skip: all nan column doesn't contain number"},
+               {iceberg.IsIn(iceberg.Reference("some_nan_correct_bounds"), 
int32(1), 10, 30), true, "should read: 10 within bounds"},
+               {iceberg.IsIn(iceberg.Reference("some_nan_correct_bounds"), 
int32(1), 30), false, "should skip: 1 and 30 not within bounds"},
+               {iceberg.IsIn(iceberg.Reference("some_nan_correct_bounds"), 
int32(5), 7), true, "should read: overlap with lower bound"},
+               {iceberg.IsIn(iceberg.Reference("some_nan_correct_bounds"), 
int32(22), 25), true, "should read: overlap with upper bound"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFileNan, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFileNan)
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestNotInWithNans() {
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               msg      string
+       }{
+               {iceberg.NotIn(iceberg.Reference("all_nan"), int32(1), 10, 30), 
true, "should read: no visibility"},
+               {iceberg.NotIn(iceberg.Reference("max_nan"), int32(1), 10, 30), 
true, "should read: no visibility"},
+               {iceberg.NotIn(iceberg.Reference("min_max_nan"), int32(1), 10, 
30), true, "should read: no visibility"},
+               {iceberg.NotIn(iceberg.Reference("all_nan_null_bounds"), 
int32(1), 10, 30), true, "should read: no visibility"},
+               {iceberg.NotIn(iceberg.Reference("some_nan_correct_bounds"), 
int32(1), 10, 30), true, "should read: no visibility"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFileNan, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(suite.dataFileNan)
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestStartsWith() {
+       ref, refEmpty := iceberg.Reference("required"), 
iceberg.Reference("some_empty")
+
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               dataFile iceberg.DataFile
+               msg      string
+       }{
+               {iceberg.StartsWith(ref, "a"), true, suite.dataFiles[0], 
"should read: no stats"},
+               {iceberg.StartsWith(ref, "a"), true, suite.dataFiles[1], 
"should read: range matches"},
+               {iceberg.StartsWith(ref, "aa"), true, suite.dataFiles[1], 
"should read: range matches"},
+               {iceberg.StartsWith(ref, "aaa"), true, suite.dataFiles[1], 
"should read: range matches"},
+               {iceberg.StartsWith(ref, "1s"), true, suite.dataFiles[2], 
"should read: range matches"},
+               {iceberg.StartsWith(ref, "1str1x"), true, suite.dataFiles[2], 
"should read: range matches"},
+               {iceberg.StartsWith(ref, "ff"), true, suite.dataFiles[3], 
"should read: range matches"},
+               {iceberg.StartsWith(ref, "aB"), false, suite.dataFiles[1], 
"should skip: range doesn't match"},
+               {iceberg.StartsWith(ref, "dWx"), false, suite.dataFiles[1], 
"should skip: range doesn't match"},
+               {iceberg.StartsWith(ref, "5"), false, suite.dataFiles[2], 
"should skip: range doesn't match"},
+               {iceberg.StartsWith(ref, "3str3x"), false, suite.dataFiles[2], 
"should skip: range doesn't match"},
+               {iceberg.StartsWith(refEmpty, "房东整租霍"), true, 
suite.dataFiles[0], "should read: range matches"},
+               {iceberg.StartsWith(iceberg.Reference("all_nulls"), ""), false, 
suite.dataFiles[0], "should skip: range doesn't match"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(tt.dataFile)
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func (suite *InclusiveMetricsTestSuite) TestNotStartsWith() {
+       ref, refEmpty := iceberg.Reference("required"), 
iceberg.Reference("some_empty")
+
+       tests := []struct {
+               expr     iceberg.BooleanExpression
+               expected bool
+               dataFile iceberg.DataFile
+               msg      string
+       }{
+               {iceberg.NotStartsWith(ref, "a"), true, suite.dataFiles[0], 
"should read: no stats"},
+               {iceberg.NotStartsWith(ref, "a"), true, suite.dataFiles[1], 
"should read: range matches"},
+               {iceberg.NotStartsWith(ref, "aa"), true, suite.dataFiles[1], 
"should read: range matches"},
+               {iceberg.NotStartsWith(ref, "aaa"), true, suite.dataFiles[1], 
"should read: range matches"},
+               {iceberg.NotStartsWith(ref, "1s"), true, suite.dataFiles[2], 
"should read: range matches"},
+               {iceberg.NotStartsWith(ref, "1str1x"), true, 
suite.dataFiles[2], "should read: range matches"},
+               {iceberg.NotStartsWith(ref, "ff"), true, suite.dataFiles[3], 
"should read: range matches"},
+               {iceberg.NotStartsWith(ref, "aB"), true, suite.dataFiles[1], 
"should read: range doesn't match"},
+               {iceberg.NotStartsWith(ref, "dWx"), true, suite.dataFiles[1], 
"should read: range doesn't match"},
+               {iceberg.NotStartsWith(ref, "5"), true, suite.dataFiles[2], 
"should read: range doesn't match"},
+               {iceberg.NotStartsWith(ref, "3str3x"), true, 
suite.dataFiles[2], "should read: range doesn't match"},
+               {iceberg.NotStartsWith(refEmpty, "房东整租霍"), true, 
suite.dataFiles[0], "should read: range matches"},
+       }
+
+       for _, tt := range tests {
+               suite.Run(tt.expr.String(), func() {
+                       eval, err := 
newInclusiveMetricsEvaluator(suite.schemaDataFile, tt.expr, true, true)
+                       suite.Require().NoError(err)
+                       shouldRead, err := eval(tt.dataFile)
+                       suite.Require().NoError(err)
+                       suite.Equal(tt.expected, shouldRead, tt.msg)
+               })
+       }
+}
+
+func TestEvaluators(t *testing.T) {
+       suite.Run(t, &ProjectionTestSuite{})
+       suite.Run(t, &InclusiveMetricsTestSuite{})
+}
diff --git a/table/scanner.go b/table/scanner.go
new file mode 100644
index 0000000..4417fc1
--- /dev/null
+++ b/table/scanner.go
@@ -0,0 +1,395 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "cmp"
+       "context"
+       "fmt"
+       "runtime"
+       "slices"
+       "sync"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/io"
+)
+
+type keyDefaultMap[K comparable, V any] struct {
+       defaultFactory func(K) V
+       data           map[K]V
+
+       mx sync.RWMutex
+}
+
+func (k *keyDefaultMap[K, V]) Get(key K) V {
+       k.mx.RLock()
+       if v, ok := k.data[key]; ok {
+               k.mx.RUnlock()
+               return v
+       }
+
+       k.mx.RUnlock()
+       k.mx.Lock()
+       defer k.mx.Unlock()
+
+       v := k.defaultFactory(key)
+       k.data[key] = v
+       return v
+}
+
+func newKeyDefaultMap[K comparable, V any](factory func(K) V) keyDefaultMap[K, 
V] {
+       return keyDefaultMap[K, V]{
+               data:           make(map[K]V),
+               defaultFactory: factory,
+       }
+}
+
+func newKeyDefaultMapWrapErr[K comparable, V any](factory func(K) (V, error)) 
keyDefaultMap[K, V] {
+       return keyDefaultMap[K, V]{
+               data: make(map[K]V),
+               defaultFactory: func(k K) V {
+                       v, err := factory(k)
+                       if err != nil {
+                               panic(err)
+                       }
+                       return v
+               },
+       }
+}
+
+type partitionRecord []any
+
+func (p partitionRecord) Size() int            { return len(p) }
+func (p partitionRecord) Get(pos int) any      { return p[pos] }
+func (p partitionRecord) Set(pos int, val any) { p[pos] = val }
+
+func getPartitionRecord(dataFile iceberg.DataFile, partitionType 
*iceberg.StructType) partitionRecord {
+       partitionData := dataFile.Partition()
+
+       out := make(partitionRecord, len(partitionType.FieldList))
+       for i, f := range partitionType.FieldList {
+               out[i] = partitionData[f.Name]
+       }
+       return out
+}
+
+func openManifest(io io.IO, manifest iceberg.ManifestFile,
+       partitionFilter, metricsEval func(iceberg.DataFile) (bool, error)) 
([]iceberg.ManifestEntry, error) {
+
+       entries, err := manifest.FetchEntries(io, true)
+       if err != nil {
+               return nil, err
+       }
+
+       out := make([]iceberg.ManifestEntry, 0, len(entries))
+       for _, entry := range entries {
+               p, err := partitionFilter(entry.DataFile())
+               if err != nil {
+                       return nil, err
+               }
+
+               m, err := metricsEval(entry.DataFile())
+               if err != nil {
+                       return nil, err
+               }
+
+               if p && m {
+                       out = append(out, entry)
+               }
+       }
+
+       return out, nil
+}
+
+type Scan struct {
+       metadata       Metadata
+       io             io.IO
+       rowFilter      iceberg.BooleanExpression
+       selectedFields []string
+       caseSensitive  bool
+       snapshotID     *int64
+       options        iceberg.Properties
+
+       partitionFilters keyDefaultMap[int, iceberg.BooleanExpression]
+}
+
+func (s *Scan) UseRef(name string) (*Scan, error) {
+       if s.snapshotID != nil {
+               return nil, fmt.Errorf("%w: cannot override ref, already set 
snapshot id %d",
+                       iceberg.ErrInvalidArgument, *s.snapshotID)
+       }
+
+       if snap := s.metadata.SnapshotByName(name); snap != nil {
+               out := &Scan{
+                       metadata:       s.metadata,
+                       io:             s.io,
+                       rowFilter:      s.rowFilter,
+                       selectedFields: s.selectedFields,
+                       caseSensitive:  s.caseSensitive,
+                       snapshotID:     &snap.SnapshotID,
+                       options:        s.options,
+               }
+               out.partitionFilters = 
newKeyDefaultMapWrapErr(out.buildPartitionProjection)
+
+               return out, nil
+       }
+
+       return nil, fmt.Errorf("%w: cannot scan unknown ref=%s", 
iceberg.ErrInvalidArgument, name)
+}
+
+func (s *Scan) Snapshot() *Snapshot {
+       if s.snapshotID != nil {
+               return s.metadata.SnapshotByID(*s.snapshotID)
+       }
+       return s.metadata.CurrentSnapshot()
+}
+
+func (s *Scan) Projection() (*iceberg.Schema, error) {
+       curSchema := s.metadata.CurrentSchema()
+       if s.snapshotID != nil {
+               snap := s.metadata.SnapshotByID(*s.snapshotID)
+               if snap == nil {
+                       return nil, fmt.Errorf("%w: snapshot not found: %d", 
ErrInvalidOperation, *s.snapshotID)
+               }
+
+               if snap.SchemaID != nil {
+                       for _, schema := range s.metadata.Schemas() {
+                               if schema.ID == *snap.SchemaID {
+                                       curSchema = schema
+                                       break
+                               }
+                       }
+               }
+       }
+
+       if slices.Contains(s.selectedFields, "*") {
+               return curSchema, nil
+       }
+
+       return curSchema.Select(s.caseSensitive, s.selectedFields...)
+}
+
+func (s *Scan) buildPartitionProjection(specID int) 
(iceberg.BooleanExpression, error) {
+       project := newInclusiveProjection(s.metadata.CurrentSchema(),
+               s.metadata.PartitionSpecs()[specID], true)
+       return project(s.rowFilter)
+}
+
+func (s *Scan) buildManifestEvaluator(specID int) (func(iceberg.ManifestFile) 
(bool, error), error) {
+       spec := s.metadata.PartitionSpecs()[specID]
+       return newManifestEvaluator(spec, s.metadata.CurrentSchema(),
+               s.partitionFilters.Get(specID), s.caseSensitive)
+}
+
+func (s *Scan) buildPartitionEvaluator(specID int) func(iceberg.DataFile) 
(bool, error) {
+       spec := s.metadata.PartitionSpecs()[specID]
+       partType := spec.PartitionType(s.metadata.CurrentSchema())
+       partSchema := iceberg.NewSchema(0, partType.FieldList...)
+       partExpr := s.partitionFilters.Get(specID)
+
+       return func(d iceberg.DataFile) (bool, error) {
+               fn, err := iceberg.ExpressionEvaluator(partSchema, partExpr, 
s.caseSensitive)
+               if err != nil {
+                       return false, err
+               }
+
+               return fn(getPartitionRecord(d, partType))
+       }
+}
+
+func (s *Scan) checkSequenceNumber(minSeqNum int64, manifest 
iceberg.ManifestFile) bool {
+       return manifest.ManifestContent() == iceberg.ManifestContentData ||
+               (manifest.ManifestContent() == iceberg.ManifestContentDeletes &&
+                       manifest.SequenceNum() >= minSeqNum)
+}
+
+func minSequenceNum(manifests []iceberg.ManifestFile) int64 {
+       n := int64(0)
+       for _, m := range manifests {
+               if m.ManifestContent() == iceberg.ManifestContentData {
+                       n = min(n, m.MinSequenceNum())
+               }
+       }
+       return n
+}
+
+func matchDeletesToData(entry iceberg.ManifestEntry, positionalDeletes 
[]iceberg.ManifestEntry) ([]iceberg.DataFile, error) {
+       idx, _ := slices.BinarySearchFunc(positionalDeletes, entry, func(me1, 
me2 iceberg.ManifestEntry) int {
+               return cmp.Compare(me1.SequenceNum(), me2.SequenceNum())
+       })
+
+       evaluator, err := 
newInclusiveMetricsEvaluator(iceberg.PositionalDeleteSchema,
+               iceberg.EqualTo(iceberg.Reference("file_path"), 
entry.DataFile().FilePath()), true, false)
+       if err != nil {
+               return nil, err
+       }
+
+       out := make([]iceberg.DataFile, 0)
+       for _, relevant := range positionalDeletes[idx:] {
+               df := relevant.DataFile()
+               ok, err := evaluator(df)
+               if err != nil {
+                       return nil, err
+               }
+               if ok {
+                       out = append(out, df)
+               }
+       }
+
+       return out, nil
+}
+
+func (s *Scan) PlanFiles(ctx context.Context) ([]FileScanTask, error) {
+       snap := s.Snapshot()
+       if snap == nil {
+               return nil, nil
+       }
+
+       // step 1: filter manifests using partition summaries
+       // the filter depends on the partition spec used to write the manifest 
file
+       // so create a cache of filters for each spec id
+       manifestEvaluators := newKeyDefaultMapWrapErr(s.buildManifestEvaluator)
+       manifestList, err := snap.Manifests(s.io)
+       if err != nil {
+               return nil, err
+       }
+
+       // remove any manifests that we don't need to use
+       manifestList = slices.DeleteFunc(manifestList, func(mf 
iceberg.ManifestFile) bool {
+               eval := manifestEvaluators.Get(int(mf.PartitionSpecID()))
+               use, err := eval(mf)
+               return !use || err != nil
+       })
+
+       // step 2: filter the data files in each manifest
+       // this filter depends on the partition spec used to write the manifest 
file
+       partitionEvaluators := newKeyDefaultMap(s.buildPartitionEvaluator)
+       metricsEval, err := newInclusiveMetricsEvaluator(
+               s.metadata.CurrentSchema(), s.rowFilter, s.caseSensitive, 
s.options["include_empty_files"] == "true")
+       if err != nil {
+               return nil, err
+       }
+
+       minSeqNum := minSequenceNum(manifestList)
+       dataEntries := make([]iceberg.ManifestEntry, 0)
+       positionalDeleteEntries := make([]iceberg.ManifestEntry, 0)
+
+       nworkers := runtime.NumCPU()
+       var wg sync.WaitGroup
+
+       manifestChan := make(chan iceberg.ManifestFile, len(manifestList))
+       entryChan := make(chan []iceberg.ManifestEntry, 20)
+
+       ctx, cancel := context.WithCancelCause(ctx)
+       for i := 0; i < nworkers; i++ {
+               wg.Add(1)
+
+               go func() {
+                       defer wg.Done()
+
+                       for {
+                               select {
+                               case m, ok := <-manifestChan:
+                                       if !ok {
+                                               return
+                                       }
+
+                                       if !s.checkSequenceNumber(minSeqNum, m) 
{
+                                               continue
+                                       }
+
+                                       entries, err := openManifest(s.io, m,
+                                               
partitionEvaluators.Get(int(m.PartitionSpecID())), metricsEval)
+                                       if err != nil {
+                                               cancel(err)
+                                               break
+                                       }
+
+                                       entryChan <- entries
+                               case <-ctx.Done():
+                                       return
+                               }
+                       }
+               }()
+       }
+
+       go func() {
+               wg.Wait()
+               close(entryChan)
+       }()
+
+       for _, m := range manifestList {
+               manifestChan <- m
+       }
+       close(manifestChan)
+
+Loop:
+       for {
+               select {
+               case <-ctx.Done():
+                       return nil, ctx.Err()
+               case entries, ok := <-entryChan:
+                       if !ok {
+                               // closed!
+                               break Loop
+                       }
+
+                       for _, e := range entries {
+                               df := e.DataFile()
+                               switch df.ContentType() {
+                               case iceberg.EntryContentData:
+                                       dataEntries = append(dataEntries, e)
+                               case iceberg.EntryContentPosDeletes:
+                                       positionalDeleteEntries = 
append(positionalDeleteEntries, e)
+                               case iceberg.EntryContentEqDeletes:
+                                       return nil, fmt.Errorf("iceberg-go does 
not yet support equality deletes")
+                               default:
+                                       return nil, fmt.Errorf("%w: unknown 
DataFileContent type (%s): %s",
+                                               ErrInvalidMetadata, 
df.ContentType(), e)
+                               }
+                       }
+               }
+       }
+
+       slices.SortFunc(positionalDeleteEntries, func(a, b 
iceberg.ManifestEntry) int {
+               return cmp.Compare(a.SequenceNum(), b.SequenceNum())
+       })
+
+       results := make([]FileScanTask, 0)
+       for _, e := range dataEntries {
+               deleteFiles, err := matchDeletesToData(e, 
positionalDeleteEntries)
+               if err != nil {
+                       return nil, err
+               }
+
+               results = append(results, FileScanTask{
+                       File:        e.DataFile(),
+                       DeleteFiles: deleteFiles,
+                       Start:       0,
+                       Length:      e.DataFile().FileSizeBytes(),
+               })
+       }
+
+       return results, nil
+}
+
+type FileScanTask struct {
+       File          iceberg.DataFile
+       DeleteFiles   []iceberg.DataFile
+       Start, Length int64
+}
diff --git a/table/scanner_test.go b/table/scanner_test.go
new file mode 100644
index 0000000..a35d2dd
--- /dev/null
+++ b/table/scanner_test.go
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build integration
+
+package table_test
+
+import (
+       "context"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/catalog"
+       "github.com/apache/iceberg-go/io"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestScanner(t *testing.T) {
+       cat, err := catalog.NewRestCatalog("rest", "http://localhost:8181";)
+       require.NoError(t, err)
+
+       props := iceberg.Properties{
+               io.S3Region:      "us-east-1",
+               io.S3AccessKeyID: "admin", io.S3SecretAccessKey: "password"}
+
+       tests := []struct {
+               table            string
+               expr             iceberg.BooleanExpression
+               expectedNumTasks int
+       }{
+               {"test_all_types", iceberg.AlwaysTrue{}, 5},
+               {"test_all_types", 
iceberg.LessThan(iceberg.Reference("intCol"), int32(3)), 3},
+               {"test_all_types", 
iceberg.GreaterThanEqual(iceberg.Reference("intCol"), int32(3)), 2},
+               {"test_partitioned_by_identity",
+                       iceberg.GreaterThanEqual(iceberg.Reference("ts"), 
"2023-03-05T00:00:00+00:00"), 8},
+               {"test_partitioned_by_identity",
+                       iceberg.LessThan(iceberg.Reference("ts"), 
"2023-03-05T00:00:00+00:00"), 4},
+               {"test_partitioned_by_years", iceberg.AlwaysTrue{}, 2},
+               {"test_partitioned_by_years", 
iceberg.LessThan(iceberg.Reference("dt"), "2023-03-05"), 1},
+               {"test_partitioned_by_years", 
iceberg.GreaterThanEqual(iceberg.Reference("dt"), "2023-03-05"), 1},
+               {"test_partitioned_by_months", 
iceberg.GreaterThanEqual(iceberg.Reference("dt"), "2023-03-05"), 1},
+               {"test_partitioned_by_days", 
iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00"), 
8},
+               {"test_partitioned_by_hours", 
iceberg.GreaterThanEqual(iceberg.Reference("ts"), "2023-03-05T00:00:00+00:00"), 
8},
+               {"test_partitioned_by_truncate", 
iceberg.GreaterThanEqual(iceberg.Reference("letter"), "e"), 8},
+               {"test_partitioned_by_bucket", 
iceberg.GreaterThanEqual(iceberg.Reference("number"), int32(5)), 6},
+               // for some reason when I run the provisioning locally i get 5 
data files
+               // but GHA CI running spark provisioning ends up with only 4 
files?
+               // anyone know why?
+               {"test_uuid_and_fixed_unpartitioned", iceberg.AlwaysTrue{}, 4},
+               {"test_uuid_and_fixed_unpartitioned", 
iceberg.EqualTo(iceberg.Reference("uuid_col"), 
"102cb62f-e6f8-4eb0-9973-d9b012ff0967"), 1},
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.table+" "+tt.expr.String(), func(t *testing.T) {
+                       ident := catalog.ToRestIdentifier("default", tt.table)
+
+                       tbl, err := cat.LoadTable(context.Background(), ident, 
props)
+                       require.NoError(t, err)
+
+                       scan := tbl.Scan(tt.expr, 0, true, "*")
+                       tasks, err := scan.PlanFiles(context.Background())
+                       require.NoError(t, err)
+
+                       assert.Len(t, tasks, tt.expectedNumTasks)
+               })
+       }
+}
+
+func TestScannerWithDeletes(t *testing.T) {
+       cat, err := catalog.NewRestCatalog("rest", "http://localhost:8181";)
+       require.NoError(t, err)
+
+       props := iceberg.Properties{
+               io.S3Region:      "us-east-1",
+               io.S3AccessKeyID: "admin", io.S3SecretAccessKey: "password"}
+
+       ident := catalog.ToRestIdentifier("default", 
"test_positional_mor_deletes")
+
+       tbl, err := cat.LoadTable(context.Background(), ident, props)
+       require.NoError(t, err)
+
+       scan := tbl.Scan(iceberg.AlwaysTrue{}, 0, true, "*")
+       tasks, err := scan.PlanFiles(context.Background())
+       require.NoError(t, err)
+
+       assert.Len(t, tasks, 1)
+       assert.Len(t, tasks[0].DeleteFiles, 1)
+
+       tagScan, err := scan.UseRef("tag_12")
+       require.NoError(t, err)
+
+       tasks, err = tagScan.PlanFiles(context.Background())
+       require.NoError(t, err)
+
+       assert.Len(t, tasks, 1)
+       assert.Len(t, tasks[0].DeleteFiles, 0)
+
+       _, err = tagScan.UseRef("without_5")
+       assert.ErrorIs(t, err, iceberg.ErrInvalidArgument)
+
+       tagScan, err = scan.UseRef("without_5")
+       require.NoError(t, err)
+
+       tasks, err = tagScan.PlanFiles(context.Background())
+       require.NoError(t, err)
+
+       assert.Len(t, tasks, 1)
+       assert.Len(t, tasks[0].DeleteFiles, 1)
+}
diff --git a/table/table.go b/table/table.go
index ff370ec..b4cf92c 100644
--- a/table/table.go
+++ b/table/table.go
@@ -59,6 +59,23 @@ func (t Table) Schemas() map[int]*iceberg.Schema {
        return m
 }
 
+func (t Table) Scan(rowFilter iceberg.BooleanExpression, snapshotID int64, 
caseSensitive bool, fields ...string) *Scan {
+       s := &Scan{
+               metadata:       t.metadata,
+               io:             t.fs,
+               rowFilter:      rowFilter,
+               selectedFields: fields,
+               caseSensitive:  caseSensitive,
+       }
+
+       if snapshotID != 0 {
+               s.snapshotID = &snapshotID
+       }
+
+       s.partitionFilters = newKeyDefaultMapWrapErr(s.buildPartitionProjection)
+       return s
+}
+
 func New(ident Identifier, meta Metadata, location string, fs io.IO) *Table {
        return &Table{
                identifier:       ident,
diff --git a/transforms.go b/transforms.go
index 81e85b0..887d46b 100644
--- a/transforms.go
+++ b/transforms.go
@@ -19,10 +19,18 @@ package iceberg
 
 import (
        "encoding"
+       "encoding/binary"
        "fmt"
+       "math"
+       "math/big"
        "strconv"
        "strings"
        "time"
+       "unsafe"
+
+       "github.com/apache/arrow/go/v16/arrow/decimal128"
+       "github.com/google/uuid"
+       "github.com/twmb/murmur3"
 )
 
 // ParseTransform takes the string representation of a transform as
@@ -75,6 +83,9 @@ type Transform interface {
        fmt.Stringer
        encoding.TextMarshaler
        ResultType(t Type) Type
+       Equals(Transform) bool
+       Apply(Optional[Literal]) Optional[Literal]
+       Project(name string, pred BoundPredicate) (UnboundPredicate, error)
 }
 
 // IdentityTransform uses the identity function, performing no transformation
@@ -89,6 +100,32 @@ func (IdentityTransform) String() string { return 
"identity" }
 
 func (IdentityTransform) ResultType(t Type) Type { return t }
 
+func (IdentityTransform) Equals(other Transform) bool {
+       _, ok := other.(IdentityTransform)
+       return ok
+}
+
+func (IdentityTransform) Apply(value Optional[Literal]) Optional[Literal] {
+       return value
+}
+
+func (t IdentityTransform) Project(name string, pred BoundPredicate) 
(UnboundPredicate, error) {
+       if _, ok := pred.Term().(*BoundTransform); ok {
+               return projectTransformPredicate(t, name, pred)
+       }
+
+       switch p := pred.(type) {
+       case BoundUnaryPredicate:
+               return p.AsUnbound(Reference(name)), nil
+       case BoundLiteralPredicate:
+               return p.AsUnbound(Reference(name), p.Literal()), nil
+       case BoundSetPredicate:
+               return p.AsUnbound(Reference(name), p.Literals().Members()), nil
+       }
+
+       return nil, nil
+}
+
 // VoidTransform is a transformation that always returns nil.
 type VoidTransform struct{}
 
@@ -100,6 +137,19 @@ func (VoidTransform) String() string { return "void" }
 
 func (VoidTransform) ResultType(t Type) Type { return t }
 
+func (VoidTransform) Equals(other Transform) bool {
+       _, ok := other.(VoidTransform)
+       return ok
+}
+
+func (VoidTransform) Apply(value Optional[Literal]) Optional[Literal] {
+       return Optional[Literal]{}
+}
+
+func (VoidTransform) Project(string, BoundPredicate) (UnboundPredicate, error) 
{
+       return nil, nil
+}
+
 // BucketTransform transforms values into a bucket partition value. It is
 // parameterized by a number of buckets. Bucket partition transforms use
 // a 32-bit hash of the source value to produce a positive value by mod
@@ -116,6 +166,138 @@ func (t BucketTransform) String() string { return 
fmt.Sprintf("bucket[%d]", t.Nu
 
 func (BucketTransform) ResultType(Type) Type { return PrimitiveTypes.Int32 }
 
+func hashHelperInt[T ~int32 | ~int64](v any) uint32 {
+       var (
+               val = uint64(v.(T))
+               buf [8]byte
+               b   = buf[:]
+       )
+
+       binary.LittleEndian.PutUint64(b, val)
+       return murmur3.Sum32(b)
+}
+
+func (t BucketTransform) Equals(other Transform) bool {
+       rhs, ok := other.(BucketTransform)
+       if !ok {
+               return false
+       }
+
+       return t.NumBuckets == rhs.NumBuckets
+}
+
+func (t BucketTransform) Apply(value Optional[Literal]) Optional[Literal] {
+       if !value.Valid {
+               return Optional[Literal]{}
+       }
+
+       var hash uint32
+       switch v := value.Val.(type) {
+       case TypedLiteral[[]byte]:
+               hash = murmur3.Sum32(v.Value())
+       case StringLiteral:
+               hash = murmur3.Sum32(unsafe.Slice(unsafe.StringData(string(v)), 
len(v)))
+       case UUIDLiteral:
+               hash = murmur3.Sum32(v[:])
+       case DecimalLiteral:
+               b, _ := v.MarshalBinary()
+               hash = murmur3.Sum32(b)
+       case Int32Literal:
+               hash = hashHelperInt[int64](int64(v))
+       case Int64Literal:
+               hash = hashHelperInt[int64](int64(v))
+       case DateLiteral:
+               hash = hashHelperInt[int64](int64(v))
+       case TimeLiteral:
+               hash = hashHelperInt[int64](int64(v))
+       case TimestampLiteral:
+               hash = hashHelperInt[int64](int64(v))
+       default:
+               return Optional[Literal]{}
+       }
+
+       return Optional[Literal]{
+               Valid: true,
+               Val:   Int32Literal((int32(hash) & math.MaxInt32) % 
int32(t.NumBuckets))}
+}
+
+func (t BucketTransform) Transformer(src Type) func(any) Optional[int32] {
+       var h func(any) uint32
+
+       switch src.(type) {
+       case Int32Type:
+               h = hashHelperInt[int32]
+       case DateType:
+               h = hashHelperInt[Date]
+       case Int64Type:
+               h = hashHelperInt[int64]
+       case TimeType:
+               h = hashHelperInt[Time]
+       case TimestampType:
+               h = hashHelperInt[Timestamp]
+       case TimestampTzType:
+               h = hashHelperInt[Timestamp]
+       case DecimalType:
+               h = func(v any) uint32 {
+                       b, _ := DecimalLiteral(v.(Decimal)).MarshalBinary()
+                       return murmur3.Sum32(b)
+               }
+       case StringType, FixedType, BinaryType:
+               h = func(v any) uint32 {
+                       if v, ok := v.([]byte); ok {
+                               return murmur3.Sum32(v)
+                       }
+
+                       str := v.(string)
+                       return 
murmur3.Sum32(unsafe.Slice(unsafe.StringData(str), len(str)))
+               }
+       case UUIDType:
+               h = func(v any) uint32 {
+                       if v, ok := v.([]byte); ok {
+                               return murmur3.Sum32(v)
+                       }
+
+                       u := v.(uuid.UUID)
+                       return murmur3.Sum32(u[:])
+               }
+       }
+
+       return func(v any) Optional[int32] {
+               if v == nil {
+                       return Optional[int32]{}
+               }
+
+               return Optional[int32]{
+                       Valid: true,
+                       Val:   int32((int32(h(v)) & math.MaxInt32) % 
int32(t.NumBuckets))}
+       }
+}
+
+func (t BucketTransform) Project(name string, pred BoundPredicate) 
(UnboundPredicate, error) {
+       if _, ok := pred.Term().(*BoundTransform); ok {
+               return projectTransformPredicate(t, name, pred)
+       }
+
+       transformer := t.Transformer(pred.Term().Type())
+       switch p := pred.(type) {
+       case BoundUnaryPredicate:
+               return p.AsUnbound(Reference(name)), nil
+       case BoundLiteralPredicate:
+               if p.Op() != OpEQ {
+                       break
+               }
+               return p.AsUnbound(Reference(name), 
transformLiteral(transformer, p.Literal())), nil
+       case BoundSetPredicate:
+               if p.Op() != OpIn {
+                       break
+               }
+
+               return setApplyTransform(name, p, transformer), nil
+       }
+
+       return nil, nil
+}
+
 // TruncateTransform is a transformation for truncating a value to a specified 
width.
 type TruncateTransform struct {
        Width int
@@ -129,8 +311,177 @@ func (t TruncateTransform) String() string { return 
fmt.Sprintf("truncate[%d]",
 
 func (TruncateTransform) ResultType(t Type) Type { return t }
 
+func (t TruncateTransform) Equals(other Transform) bool {
+       rhs, ok := other.(TruncateTransform)
+       if !ok {
+               return false
+       }
+
+       return t.Width == rhs.Width
+}
+
+func (t TruncateTransform) Transformer(src Type) (func(any) any, error) {
+       switch src.(type) {
+       case Int32Type:
+               return func(v any) any {
+                       if v == nil {
+                               return nil
+                       }
+
+                       val := v.(int32)
+                       return val - (val % int32(t.Width))
+               }, nil
+       case Int64Type:
+               return func(v any) any {
+                       if v == nil {
+                               return nil
+                       }
+
+                       val := v.(int64)
+                       return val - (val % int64(t.Width))
+               }, nil
+       case StringType, BinaryType:
+               return func(v any) any {
+                       switch v := v.(type) {
+                       case string:
+                               return v[:min(len(v), t.Width)]
+                       case []byte:
+                               return v[:min(len(v), t.Width)]
+                       default:
+                               return nil
+                       }
+               }, nil
+       case DecimalType:
+               bigWidth := big.NewInt(int64(t.Width))
+               return func(v any) any {
+                       if v == nil {
+                               return nil
+                       }
+
+                       val := v.(Decimal)
+                       unscaled := val.Val.BigInt()
+                       // unscaled - (((unscaled % width) + width) % width)
+                       applied := (&big.Int{}).Mod(unscaled, bigWidth)
+                       applied.Add(applied, bigWidth).Mod(applied, bigWidth)
+                       val.Val = decimal128.FromBigInt(unscaled.Sub(unscaled, 
applied))
+                       return val
+               }, nil
+       }
+
+       return nil, fmt.Errorf("%w: cannot truncate for type %s",
+               ErrInvalidArgument, src)
+}
+
+func (t TruncateTransform) Apply(value Optional[Literal]) (out 
Optional[Literal]) {
+       if !value.Valid {
+               return
+       }
+
+       fn, err := t.Transformer(value.Val.Type())
+       if err != nil {
+               return
+       }
+
+       out.Valid = true
+       switch v := value.Val.(type) {
+       case Int32Literal:
+               out.Val = Int32Literal(fn(int32(v)).(int32))
+       case Int64Literal:
+               out.Val = Int64Literal(fn(int64(v)).(int64))
+       case DecimalLiteral:
+               out.Val = DecimalLiteral(fn(Decimal(v)).(Decimal))
+       case StringLiteral:
+               out.Val = StringLiteral(fn(string(v)).(string))
+       case BinaryLiteral:
+               out.Val = BinaryLiteral(fn([]byte(v)).([]byte))
+       }
+
+       return
+}
+
+func (t TruncateTransform) Project(name string, pred BoundPredicate) 
(UnboundPredicate, error) {
+       if _, ok := pred.Term().(*BoundTransform); ok {
+               return projectTransformPredicate(t, name, pred)
+       }
+
+       fieldType := pred.Term().Ref().Field().Type
+
+       transformer, err := t.Transformer(fieldType)
+       if err != nil {
+               return nil, err
+       }
+
+       switch p := pred.(type) {
+       case BoundUnaryPredicate:
+               return p.AsUnbound(Reference(name)), nil
+       case BoundSetPredicate:
+               if p.Op() != OpIn {
+                       break
+               }
+
+               switch fieldType.(type) {
+               case Int32Type:
+                       return setApplyTransform(name, p, 
wrapTransformFn[int32](transformer)), nil
+               case Int64Type:
+                       return setApplyTransform(name, p, 
wrapTransformFn[int64](transformer)), nil
+               case DecimalType:
+                       return setApplyTransform(name, p, 
wrapTransformFn[Decimal](transformer)), nil
+               case StringType:
+                       return setApplyTransform(name, p, 
wrapTransformFn[string](transformer)), nil
+               case BinaryType:
+                       return setApplyTransform(name, p, 
wrapTransformFn[[]byte](transformer)), nil
+               }
+       case BoundLiteralPredicate:
+               switch fieldType.(type) {
+               case Int32Type:
+                       return truncateNumber(name, p, 
wrapTransformFn[int32](transformer))
+               case Int64Type:
+                       return truncateNumber(name, p, 
wrapTransformFn[int64](transformer))
+               case DecimalType:
+                       return truncateNumber(name, p, 
wrapTransformFn[Decimal](transformer))
+               case StringType:
+                       return truncateArray(name, p, 
wrapTransformFn[string](transformer))
+               case BinaryType:
+                       return truncateArray(name, p, 
wrapTransformFn[[]byte](transformer))
+               }
+       }
+
+       return nil, nil
+}
+
 var epochTM = time.Unix(0, 0).UTC()
 
+type timeTransform interface {
+       Transform
+       Transformer(Type) (func(any) Optional[int32], error)
+}
+
+func projectTimeTransform(t timeTransform, name string, pred BoundPredicate) 
(UnboundPredicate, error) {
+       if _, ok := pred.Term().(*BoundTransform); ok {
+               return projectTransformPredicate(t, name, pred)
+       }
+
+       transformer, err := t.Transformer(pred.Term().Ref().Type())
+       if err != nil {
+               return nil, err
+       }
+
+       switch p := pred.(type) {
+       case BoundUnaryPredicate:
+               return p.AsUnbound(Reference(name)), nil
+       case BoundLiteralPredicate:
+               return truncateNumber(name, p, transformer)
+       case BoundSetPredicate:
+               if p.Op() != OpIn {
+                       break
+               }
+
+               return setApplyTransform(name, p, transformer), nil
+       }
+
+       return nil, nil
+}
+
 // YearTransform transforms a datetime value into a year value.
 type YearTransform struct{}
 
@@ -142,6 +493,62 @@ func (YearTransform) String() string { return "year" }
 
 func (YearTransform) ResultType(Type) Type { return PrimitiveTypes.Int32 }
 
+func (YearTransform) Equals(other Transform) bool {
+       _, ok := other.(YearTransform)
+       return ok
+}
+
+func (YearTransform) Transformer(src Type) (func(any) Optional[int32], error) {
+       switch src.(type) {
+       case DateType:
+               return func(v any) Optional[int32] {
+                       if v == nil {
+                               return Optional[int32]{}
+                       }
+
+                       return Optional[int32]{
+                               Valid: true,
+                               Val:   int32(v.(Date).ToTime().Year() - 
epochTM.Year()),
+                       }
+               }, nil
+       case TimestampType, TimestampTzType:
+               return func(v any) Optional[int32] {
+                       if v == nil {
+                               return Optional[int32]{}
+                       }
+
+                       return Optional[int32]{
+                               Valid: true,
+                               Val:   int32(v.(Timestamp).ToTime().Year() - 
epochTM.Year()),
+                       }
+               }, nil
+       }
+
+       return nil, fmt.Errorf("%w: cannot apply year transform for type %s",
+               ErrInvalidArgument, src)
+}
+
+func (YearTransform) Apply(value Optional[Literal]) (out Optional[Literal]) {
+       if !value.Valid {
+               return
+       }
+
+       switch v := value.Val.(type) {
+       case DateLiteral:
+               out.Valid = true
+               out.Val = Int32Literal(Date(v).ToTime().Year() - epochTM.Year())
+       case TimestampLiteral:
+               out.Valid = true
+               out.Val = Int32Literal(Timestamp(v).ToTime().Year() - 
epochTM.Year())
+       }
+
+       return
+}
+
+func (t YearTransform) Project(name string, pred BoundPredicate) 
(UnboundPredicate, error) {
+       return projectTimeTransform(t, name, pred)
+}
+
 // MonthTransform transforms a datetime value into a month value.
 type MonthTransform struct{}
 
@@ -153,6 +560,70 @@ func (MonthTransform) String() string { return "month" }
 
 func (MonthTransform) ResultType(Type) Type { return PrimitiveTypes.Int32 }
 
+func (MonthTransform) Equals(other Transform) bool {
+       _, ok := other.(MonthTransform)
+       return ok
+}
+
+func (MonthTransform) Transformer(src Type) (func(any) Optional[int32], error) 
{
+       switch src.(type) {
+       case DateType:
+               return func(v any) Optional[int32] {
+                       if v == nil {
+                               return Optional[int32]{}
+                       }
+
+                       d := v.(Date).ToTime()
+                       return Optional[int32]{
+                               Valid: true,
+                               Val:   int32((d.Year()-epochTM.Year())*12 + 
(int(d.Month()) - int(epochTM.Month()))),
+                       }
+
+               }, nil
+       case TimestampType, TimestampTzType:
+               return func(v any) Optional[int32] {
+                       if v == nil {
+                               return Optional[int32]{}
+                       }
+
+                       d := v.(Timestamp).ToTime()
+                       return Optional[int32]{
+                               Valid: true,
+                               Val:   int32((d.Year()-epochTM.Year())*12 + 
(int(d.Month()) - int(epochTM.Month()))),
+                       }
+
+               }, nil
+
+       }
+
+       return nil, fmt.Errorf("%w: cannot apply month transform for type %s",
+               ErrInvalidArgument, src)
+}
+
+func (MonthTransform) Apply(value Optional[Literal]) (out Optional[Literal]) {
+       if !value.Valid {
+               return
+       }
+
+       var tm time.Time
+       switch v := value.Val.(type) {
+       case DateLiteral:
+               tm = Date(v).ToTime()
+       case TimestampLiteral:
+               tm = Timestamp(v).ToTime()
+       default:
+               return
+       }
+
+       out.Valid = true
+       out.Val = Int32Literal(int32((tm.Year()-epochTM.Year())*12 + 
(int(tm.Month()) - int(epochTM.Month()))))
+       return
+}
+
+func (t MonthTransform) Project(name string, pred BoundPredicate) 
(UnboundPredicate, error) {
+       return projectTimeTransform(t, name, pred)
+}
+
 // DayTransform transforms a datetime value into a date value.
 type DayTransform struct{}
 
@@ -164,6 +635,59 @@ func (DayTransform) String() string { return "day" }
 
 func (DayTransform) ResultType(Type) Type { return PrimitiveTypes.Date }
 
+func (DayTransform) Equals(other Transform) bool {
+       _, ok := other.(DayTransform)
+       return ok
+}
+
+func (DayTransform) Transformer(src Type) (func(any) Optional[int32], error) {
+       switch src.(type) {
+       case DateType:
+               return func(v any) Optional[int32] {
+                       if v == nil {
+                               return Optional[int32]{}
+                       }
+
+                       return Optional[int32]{
+                               Valid: true,
+                               Val:   int32(v.(Date)),
+                       }
+               }, nil
+       case TimestampType, TimestampTzType:
+               return func(v any) Optional[int32] {
+                       if v == nil {
+                               return Optional[int32]{}
+                       }
+
+                       return Optional[int32]{
+                               Valid: true,
+                               Val:   int32(v.(Timestamp).ToDate()),
+                       }
+               }, nil
+       }
+
+       return nil, fmt.Errorf("%w: cannot apply day transform for type %s",
+               ErrInvalidArgument, src)
+}
+
+func (DayTransform) Apply(value Optional[Literal]) (out Optional[Literal]) {
+       if !value.Valid {
+               return
+       }
+
+       switch v := value.Val.(type) {
+       case DateLiteral:
+               out.Valid, out.Val = true, Int32Literal(v)
+       case TimestampLiteral:
+               out.Valid, out.Val = true, Int32Literal(Timestamp(v).ToDate())
+       }
+       return
+}
+
+func (t DayTransform) Project(name string, pred BoundPredicate) 
(UnboundPredicate, error) {
+       return projectTimeTransform(t, name, pred)
+}
+
 // HourTransform transforms a datetime value into an hour value.
 type HourTransform struct{}
 
@@ -174,3 +698,174 @@ func (t HourTransform) MarshalText() ([]byte, error) {
 func (HourTransform) String() string { return "hour" }
 
 func (HourTransform) ResultType(Type) Type { return PrimitiveTypes.Int32 }
+
+func (HourTransform) Equals(other Transform) bool {
+       _, ok := other.(HourTransform)
+       return ok
+}
+
+func (HourTransform) Transformer(src Type) (func(any) Optional[int32], error) {
+       switch src.(type) {
+       case TimestampType, TimestampTzType:
+               const factor = int64(time.Hour / time.Microsecond)
+               return func(v any) Optional[int32] {
+                       if v == nil {
+                               return Optional[int32]{}
+                       }
+
+                       return Optional[int32]{
+                               Valid: true,
+                               Val:   int32(int64(v.(Timestamp)) / factor),
+                       }
+               }, nil
+       }
+
+       return nil, fmt.Errorf("%w: cannot apply hour transform for type %s",
+               ErrInvalidArgument, src)
+}
+
+func (HourTransform) Apply(value Optional[Literal]) (out Optional[Literal]) {
+       if !value.Valid {
+               return
+       }
+
+       switch v := value.Val.(type) {
+       case TimestampLiteral:
+               const factor = int64(time.Hour / time.Microsecond)
+               out.Valid, out.Val = true, Int32Literal(int32(int64(v)/factor))
+       }
+
+       return
+}
+
+func (t HourTransform) Project(name string, pred BoundPredicate) 
(UnboundPredicate, error) {
+       return projectTimeTransform(t, name, pred)
+}
+
+func removeTransform(partName string, pred BoundPredicate) (UnboundPredicate, 
error) {
+       switch p := pred.(type) {
+       case BoundUnaryPredicate:
+               return p.AsUnbound(Reference(partName)), nil
+       case BoundLiteralPredicate:
+               return p.AsUnbound(Reference(partName), p.Literal()), nil
+       case BoundSetPredicate:
+               return p.AsUnbound(Reference(partName), 
p.Literals().Members()), nil
+       }
+
+       return nil, fmt.Errorf("%w: cannot replace transform in unknown 
predicate: %s",
+               ErrInvalidArgument, pred)
+}
+
+func projectTransformPredicate(t Transform, partitionName string, pred 
BoundPredicate) (UnboundPredicate, error) {
+       term := pred.Term()
+       bt, ok := term.(*BoundTransform)
+       if !ok || !t.Equals(bt.transform) {
+               return nil, nil
+       }
+
+       return removeTransform(partitionName, pred)
+}
+
+func transformLiteral[T LiteralType](fn func(any) Optional[T], lit Literal) 
Literal {
+       switch l := lit.(type) {
+       case BoolLiteral:
+               return NewLiteral(fn(bool(l)).Val)
+       case Int32Literal:
+               return NewLiteral(fn(int32(l)).Val)
+       case Int64Literal:
+               return NewLiteral(fn(int64(l)).Val)
+       case Float32Literal:
+               return NewLiteral(fn(float32(l)).Val)
+       case Float64Literal:
+               return NewLiteral(fn(float64(l)).Val)
+       case DateLiteral:
+               return NewLiteral(fn(Date(l)).Val)
+       case TimeLiteral:
+               return NewLiteral(fn(Time(l)).Val)
+       case TimestampLiteral:
+               return NewLiteral(fn(Timestamp(l)).Val)
+       case StringLiteral:
+               return NewLiteral(fn(string(l)).Val)
+       case FixedLiteral:
+               return NewLiteral(fn([]byte(l)).Val)
+       case BinaryLiteral:
+               return NewLiteral(fn([]byte(l)).Val)
+       case UUIDLiteral:
+               return NewLiteral(fn(uuid.UUID(l)).Val)
+       case DecimalLiteral:
+               return NewLiteral(fn(Decimal(l)).Val)
+       }
+
+       panic("invalid literal type")
+}
+
+func wrapTransformFn[T LiteralType](fn func(any) any) func(any) Optional[T] {
+       return func(v any) Optional[T] {
+               out := fn(v)
+               if out == nil {
+                       return Optional[T]{}
+               }
+               return Optional[T]{Valid: true, Val: out.(T)}
+       }
+}
+
+func truncateNumber[T LiteralType](name string, pred BoundLiteralPredicate, fn 
func(any) Optional[T]) (UnboundPredicate, error) {
+       boundary, ok := pred.Literal().(NumericLiteral)
+       if !ok {
+               return nil, fmt.Errorf("%w: expected numeric literal, got %s",
+                       ErrInvalidArgument, boundary.Type())
+       }
+
+       switch pred.Op() {
+       case OpLT:
+               return LiteralPredicate(OpLTEQ, Reference(name),
+                       transformLiteral(fn, boundary.Decrement())), nil
+       case OpLTEQ:
+               return LiteralPredicate(OpLTEQ, Reference(name),
+                       transformLiteral(fn, boundary)), nil
+       case OpGT:
+               return LiteralPredicate(OpGTEQ, Reference(name),
+                       transformLiteral(fn, boundary.Increment())), nil
+       case OpGTEQ:
+               return LiteralPredicate(OpGTEQ, Reference(name),
+                       transformLiteral(fn, boundary)), nil
+       case OpEQ:
+               return LiteralPredicate(OpEQ, Reference(name),
+                       transformLiteral(fn, boundary)), nil
+       }
+
+       return nil, nil
+}
+
+func truncateArray[T LiteralType](name string, pred BoundLiteralPredicate, fn 
func(any) Optional[T]) (UnboundPredicate, error) {
+       boundary := pred.Literal()
+
+       switch pred.Op() {
+       case OpLT, OpLTEQ:
+               return LiteralPredicate(OpLTEQ, Reference(name),
+                       transformLiteral(fn, boundary)), nil
+       case OpGT, OpGTEQ:
+               return LiteralPredicate(OpGTEQ, Reference(name),
+                       transformLiteral(fn, boundary)), nil
+       case OpEQ:
+               return LiteralPredicate(OpEQ, Reference(name),
+                       transformLiteral(fn, boundary)), nil
+       case OpStartsWith:
+               return LiteralPredicate(OpStartsWith, Reference(name),
+                       transformLiteral(fn, boundary)), nil
+       case OpNotStartsWith:
+               return LiteralPredicate(OpNotStartsWith, Reference(name),
+                       transformLiteral(fn, boundary)), nil
+       }
+
+       return nil, nil
+}
+
+func setApplyTransform[T LiteralType](name string, pred BoundSetPredicate, fn 
func(any) Optional[T]) UnboundPredicate {
+       lits := pred.Literals().Members()
+       for i, l := range lits {
+               lits[i] = transformLiteral(fn, l)
+       }
+
+       return pred.AsUnbound(Reference(name), lits)
+}

Reply via email to