This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 36b7c933b4 Python: Filter on Datafile metrics (#6714)
36b7c933b4 is described below

commit 36b7c933b444bd3df2137e38eb275e12a8ea7afc
Author: Fokko Driesprong <[email protected]>
AuthorDate: Mon Feb 27 23:02:49 2023 +0100

    Python: Filter on Datafile metrics (#6714)
---
 python/pyiceberg/conversions.py            |   3 +-
 python/pyiceberg/expressions/literals.py   |   3 +
 python/pyiceberg/expressions/visitors.py   | 274 +++++++++-
 python/pyiceberg/io/pyarrow.py             |   4 +-
 python/pyiceberg/table/__init__.py         |  23 +-
 python/tests/expressions/test_evaluator.py | 803 +++++++++++++++++++++++++----
 6 files changed, 1002 insertions(+), 108 deletions(-)

diff --git a/python/pyiceberg/conversions.py b/python/pyiceberg/conversions.py
index 4707dab85c..948ddcdcad 100644
--- a/python/pyiceberg/conversions.py
+++ b/python/pyiceberg/conversions.py
@@ -38,6 +38,7 @@ from typing import (
     Union,
 )
 
+from pyiceberg.typedef import L
 from pyiceberg.types import (
     BinaryType,
     BooleanType,
@@ -241,7 +242,7 @@ def _(primitive_type: DecimalType, value: Decimal) -> bytes:
 
 
 @singledispatch
-def from_bytes(primitive_type: PrimitiveType, b: bytes) -> Union[bool, bytes, 
Decimal, float, int, str, uuid.UUID]:
+def from_bytes(primitive_type: PrimitiveType, b: bytes) -> L:
     """A generic function which converts bytes to a built-in python value
 
     Args:
diff --git a/python/pyiceberg/expressions/literals.py 
b/python/pyiceberg/expressions/literals.py
index 3e3233af0d..ba380cfd67 100644
--- a/python/pyiceberg/expressions/literals.py
+++ b/python/pyiceberg/expressions/literals.py
@@ -328,6 +328,9 @@ class FloatLiteral(Literal[float]):
     def __ge__(self, other: Any) -> bool:
         return self._value32 >= other
 
+    def __hash__(self) -> int:
+        return hash(self._value32)
+
     @singledispatchmethod
     def to(self, type_var: IcebergType) -> Literal:  # type: ignore
         raise TypeError(f"Cannot convert FloatLiteral into {type_var}")
diff --git a/python/pyiceberg/expressions/visitors.py 
b/python/pyiceberg/expressions/visitors.py
index 81e0ebc559..4fc666bcd9 100644
--- a/python/pyiceberg/expressions/visitors.py
+++ b/python/pyiceberg/expressions/visitors.py
@@ -14,11 +14,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import math
 from abc import ABC, abstractmethod
 from functools import singledispatch
 from typing import (
     Any,
     Callable,
+    Dict,
     Generic,
     List,
     Set,
@@ -56,15 +58,16 @@ from pyiceberg.expressions import (
     UnboundPredicate,
 )
 from pyiceberg.expressions.literals import Literal
-from pyiceberg.manifest import ManifestFile, PartitionFieldSummary
+from pyiceberg.manifest import DataFile, ManifestFile, PartitionFieldSummary
 from pyiceberg.partitioning import PartitionSpec
 from pyiceberg.schema import Schema
-from pyiceberg.typedef import StructProtocol
+from pyiceberg.typedef import EMPTY_DICT, StructProtocol
 from pyiceberg.types import (
     DoubleType,
     FloatType,
     IcebergType,
     PrimitiveType,
+    StructType,
     TimestampType,
     TimestamptzType,
 )
@@ -986,3 +989,270 @@ def expression_to_plain_format(
     # In the form of expr1 ∨ expr2 ∨ ... ∨ exprN
     visitor = ExpressionToPlainFormat(cast_int_to_datetime)
     return [visit(expression, visitor) for expression in expressions]
+
+
+class _InclusiveMetricsEvaluator(BoundBooleanExpressionVisitor[bool]):
+    struct: StructType
+    expr: BooleanExpression
+
+    value_counts: Dict[int, int]
+    null_counts: Dict[int, int]
+    nan_counts: Dict[int, int]
+    lower_bounds: Dict[int, bytes]
+    upper_bounds: Dict[int, bytes]
+
+    def __init__(self, schema: Schema, expr: BooleanExpression, 
case_sensitive: bool = True) -> None:
+        self.struct = schema.as_struct()
+        self.expr = bind(schema, rewrite_not(expr), case_sensitive)
+
+    def eval(self, file: DataFile) -> bool:
+        """Test whether the file may contain records that match the 
expression."""
+
+        if file.record_count == 0:
+            return ROWS_CANNOT_MATCH
+
+        if file.record_count < 0:
+            # Older version don't correctly implement record count from avro 
file and thus
+            # set record count -1 when importing avro tables to iceberg 
tables. This should
+            # be updated once we implemented and set correct record count.
+            return ROWS_MIGHT_MATCH
+
+        self.value_counts = file.value_counts or EMPTY_DICT
+        self.null_counts = file.null_value_counts or EMPTY_DICT
+        self.nan_counts = file.nan_value_counts or EMPTY_DICT
+        self.lower_bounds = file.lower_bounds or EMPTY_DICT
+        self.upper_bounds = file.upper_bounds or EMPTY_DICT
+
+        return visit(self.expr, self)
+
+    def _contains_nulls_only(self, field_id: int) -> bool:
+        if (value_count := self.value_counts.get(field_id)) and (null_count := 
self.null_counts.get(field_id)):
+            return value_count == null_count
+        return False
+
+    def _contains_nans_only(self, field_id: int) -> bool:
+        if (nan_count := self.nan_counts.get(field_id)) and (value_count := 
self.value_counts.get(field_id)):
+            return nan_count == value_count
+        return False
+
+    def _is_nan(self, val: Any) -> bool:
+        try:
+            return math.isnan(val)
+        except TypeError:
+            # In the case of None or other non-numeric types
+            return False
+
+    def visit_true(self) -> bool:
+        # all rows match
+        return ROWS_MIGHT_MATCH
+
+    def visit_false(self) -> bool:
+        # all rows fail
+        return ROWS_CANNOT_MATCH
+
+    def visit_not(self, child_result: bool) -> bool:
+        raise ValueError(f"NOT should be rewritten: {child_result}")
+
+    def visit_and(self, left_result: bool, right_result: bool) -> bool:
+        return left_result and right_result
+
+    def visit_or(self, left_result: bool, right_result: bool) -> bool:
+        return left_result or right_result
+
+    def visit_is_null(self, term: BoundTerm[L]) -> bool:
+        field_id = term.ref().field.field_id
+
+        if self.null_counts.get(field_id) == 0:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_null(self, term: BoundTerm[L]) -> bool:
+        # no need to check whether the field is required because binding 
evaluates that case
+        # if the column has no non-null values, the expression cannot match
+        field_id = term.ref().field.field_id
+
+        if self._contains_nulls_only(field_id):
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_is_nan(self, term: BoundTerm[L]) -> bool:
+        field_id = term.ref().field.field_id
+
+        if self.nan_counts.get(field_id) == 0:
+            return ROWS_CANNOT_MATCH
+
+        # when there's no nanCounts information, but we already know the 
column only contains null,
+        # it's guaranteed that there's no NaN value
+        if self._contains_nulls_only(field_id):
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_nan(self, term: BoundTerm[L]) -> bool:
+        field_id = term.ref().field.field_id
+
+        if self._contains_nans_only(field_id):
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
+        field = term.ref().field
+        field_id = field.field_id
+
+        if self._contains_nulls_only(field_id) or 
self._contains_nans_only(field_id):
+            return ROWS_CANNOT_MATCH
+
+        if not isinstance(field.field_type, PrimitiveType):
+            raise ValueError(f"Expected PrimitiveType: {field.field_type}")
+
+        if lower_bound_bytes := self.lower_bounds.get(field_id):  # type: 
ignore
+            lower_bound = from_bytes(field.field_type, lower_bound_bytes)  # 
type: ignore
+
+            if self._is_nan(lower_bound):
+                # NaN indicates unreliable bounds. See the 
InclusiveMetricsEvaluator docs for more.
+                return ROWS_MIGHT_MATCH
+
+            if lower_bound >= literal.value:
+                return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: 
Literal[L]) -> bool:
+        field = term.ref().field
+        field_id = field.field_id
+
+        if self._contains_nulls_only(field_id) or 
self._contains_nans_only(field_id):
+            return ROWS_CANNOT_MATCH
+
+        if not isinstance(field.field_type, PrimitiveType):
+            raise ValueError(f"Expected PrimitiveType: {field.field_type}")
+
+        if lower_bound_bytes := self.lower_bounds.get(field_id):  # type: 
ignore
+            lower_bound = from_bytes(field.field_type, lower_bound_bytes)  # 
type: ignore
+            if self._is_nan(lower_bound):
+                # NaN indicates unreliable bounds. See the 
InclusiveMetricsEvaluator docs for more.
+                return ROWS_MIGHT_MATCH
+
+            if lower_bound > literal.value:
+                return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> 
bool:
+        field = term.ref().field
+        field_id = field.field_id
+
+        if self._contains_nulls_only(field_id) or 
self._contains_nans_only(field_id):
+            return ROWS_CANNOT_MATCH
+
+        if not isinstance(field.field_type, PrimitiveType):
+            raise ValueError(f"Expected PrimitiveType: {field.field_type}")
+
+        if upper_bound_bytes := self.upper_bounds.get(field_id):  # type: 
ignore
+            upper_bound = from_bytes(field.field_type, upper_bound_bytes)  # 
type: ignore
+            if upper_bound <= literal.value:
+                if self._is_nan(upper_bound):
+                    # NaN indicates unreliable bounds. See the 
InclusiveMetricsEvaluator docs for more.
+                    return ROWS_MIGHT_MATCH
+
+                return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: 
Literal[L]) -> bool:
+        field = term.ref().field
+        field_id = field.field_id
+
+        if self._contains_nulls_only(field_id) or 
self._contains_nans_only(field_id):
+            return ROWS_CANNOT_MATCH
+
+        if not isinstance(field.field_type, PrimitiveType):
+            raise ValueError(f"Expected PrimitiveType: {field.field_type}")
+
+        if upper_bound_bytes := self.upper_bounds.get(field_id):  # type: 
ignore
+            upper_bound = from_bytes(field.field_type, upper_bound_bytes)  # 
type: ignore
+            if upper_bound < literal.value:
+                if self._is_nan(upper_bound):
+                    # NaN indicates unreliable bounds. See the 
InclusiveMetricsEvaluator docs for more.
+                    return ROWS_MIGHT_MATCH
+
+                return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
+        field = term.ref().field
+        field_id = field.field_id
+
+        if self._contains_nulls_only(field_id) or 
self._contains_nans_only(field_id):
+            return ROWS_CANNOT_MATCH
+
+        if not isinstance(field.field_type, PrimitiveType):
+            raise ValueError(f"Expected PrimitiveType: {field.field_type}")
+
+        if lower_bound_bytes := self.lower_bounds.get(field_id):  # type: 
ignore
+            lower_bound = from_bytes(field.field_type, lower_bound_bytes)  # 
type: ignore
+            if self._is_nan(lower_bound):
+                # NaN indicates unreliable bounds. See the 
InclusiveMetricsEvaluator docs for more.
+                return ROWS_MIGHT_MATCH
+
+            if lower_bound > literal.value:
+                return ROWS_CANNOT_MATCH
+
+        if upper_bound_bytes := self.upper_bounds.get(field_id):  # type: 
ignore
+            upper_bound = from_bytes(field.field_type, upper_bound_bytes)  # 
type: ignore
+            if self._is_nan(upper_bound):
+                # NaN indicates unreliable bounds. See the 
InclusiveMetricsEvaluator docs for more.
+                return ROWS_MIGHT_MATCH
+
+            if upper_bound < literal.value:
+                return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
+        return ROWS_MIGHT_MATCH
+
+    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
+        field = term.ref().field
+        field_id = field.field_id
+
+        if self._contains_nulls_only(field_id) or 
self._contains_nans_only(field_id):
+            return ROWS_CANNOT_MATCH
+
+        if len(literals) > IN_PREDICATE_LIMIT:
+            # skip evaluating the predicate if the number of values is too big
+            return ROWS_MIGHT_MATCH
+
+        if not isinstance(field.field_type, PrimitiveType):
+            raise ValueError(f"Expected PrimitiveType: {field.field_type}")
+
+        if lower_bound_bytes := self.lower_bounds.get(field_id):  # type: 
ignore
+            lower_bound = from_bytes(field.field_type, lower_bound_bytes)  # 
type: ignore
+            if self._is_nan(lower_bound):
+                # NaN indicates unreliable bounds. See the 
InclusiveMetricsEvaluator docs for more.
+                return ROWS_MIGHT_MATCH
+
+            literals = {lit for lit in literals if lower_bound <= lit}
+            if len(literals) == 0:
+                return ROWS_CANNOT_MATCH
+
+        if upper_bound_bytes := self.upper_bounds.get(field_id):  # type: 
ignore
+            upper_bound = from_bytes(field.field_type, upper_bound_bytes)  # 
type: ignore
+            # this is different from Java, here NaN is always larger
+            if self._is_nan(upper_bound):
+                return ROWS_MIGHT_MATCH
+
+            literals = {lit for lit in literals if upper_bound >= lit}
+            if len(literals) == 0:
+                return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
+        # because the bounds are not necessarily a min or max value, this 
cannot be answered using
+        # them. notIn(col, {X, ...}) with (X, Y) doesn't guarantee that X is a 
value in col.
+        return ROWS_MIGHT_MATCH
diff --git a/python/pyiceberg/io/pyarrow.py b/python/pyiceberg/io/pyarrow.py
index a064ae3bdf..d2cd762273 100644
--- a/python/pyiceberg/io/pyarrow.py
+++ b/python/pyiceberg/io/pyarrow.py
@@ -555,8 +555,10 @@ def project_table(
 
     if len(tables) > 1:
         return pa.concat_tables(tables)
-    else:
+    elif len(tables) == 1:
         return tables[0]
+    else:
+        return pa.Table.from_batches([], 
schema=schema_to_pyarrow(projected_schema))
 
 
 def to_requested_schema(requested_schema: Schema, file_schema: Schema, table: 
pa.Table) -> pa.Table:
diff --git a/python/pyiceberg/table/__init__.py 
b/python/pyiceberg/table/__init__.py
index 7c53fd0967..69ce08f457 100644
--- a/python/pyiceberg/table/__init__.py
+++ b/python/pyiceberg/table/__init__.py
@@ -43,7 +43,7 @@ from pyiceberg.expressions import (
     parser,
     visitors,
 )
-from pyiceberg.expressions.visitors import inclusive_projection
+from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator, 
inclusive_projection
 from pyiceberg.io import FileIO, load_file_io
 from pyiceberg.manifest import (
     DataFile,
@@ -314,11 +314,16 @@ def _check_content(file: DataFile) -> DataFile:
         return file
 
 
-def _open_manifest(io: FileIO, manifest: ManifestFile, partition_filter: 
Callable[[DataFile], bool]) -> List[FileScanTask]:
+def _open_manifest(
+    io: FileIO,
+    manifest: ManifestFile,
+    partition_filter: Callable[[DataFile], bool],
+    metrics_evaluator: Callable[[DataFile], bool],
+) -> List[FileScanTask]:
     all_files = files(io.new_input(manifest.manifest_path))
     matching_partition_files = filter(partition_filter, all_files)
     matching_partition_data_files = map(_check_content, 
matching_partition_files)
-    return [FileScanTask(file) for file in matching_partition_data_files]
+    return [FileScanTask(file) for file in matching_partition_data_files if 
metrics_evaluator(file)]
 
 
 class DataScan(TableScan):
@@ -376,12 +381,20 @@ class DataScan(TableScan):
         # this filter depends on the partition spec used to write the manifest 
file
 
         partition_evaluators: Dict[int, Callable[[DataFile], bool]] = 
KeyDefaultDict(self._build_partition_evaluator)
-
+        metrics_evaluator = _InclusiveMetricsEvaluator(self.table.schema(), 
self.row_filter, self.case_sensitive).eval
         with ThreadPool() as pool:
             return chain(
                 *pool.starmap(
                     func=_open_manifest,
-                    iterable=[(io, manifest, 
partition_evaluators[manifest.partition_spec_id]) for manifest in manifests],
+                    iterable=[
+                        (
+                            io,
+                            manifest,
+                            partition_evaluators[manifest.partition_spec_id],
+                            metrics_evaluator,
+                        )
+                        for manifest in manifests
+                    ],
                 )
             )
 
diff --git a/python/tests/expressions/test_evaluator.py 
b/python/tests/expressions/test_evaluator.py
index b57ede62f2..7de9c04fb7 100644
--- a/python/tests/expressions/test_evaluator.py
+++ b/python/tests/expressions/test_evaluator.py
@@ -14,11 +14,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from typing import Optional
+# pylint:disable=redefined-outer-name
+from typing import Any
 
+import pytest
+
+from pyiceberg.conversions import to_bytes
 from pyiceberg.expressions import (
-    AlwaysFalse,
-    AlwaysTrue,
     And,
     EqualTo,
     GreaterThan,
@@ -35,145 +37,748 @@ from pyiceberg.expressions import (
     NotNull,
     Or,
 )
-from pyiceberg.expressions.visitors import expression_evaluator
+from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator
+from pyiceberg.manifest import DataFile, FileFormat
 from pyiceberg.schema import Schema
-from pyiceberg.typedef import Record
 from pyiceberg.types import (
     DoubleType,
-    LongType,
+    FloatType,
+    IcebergType,
+    IntegerType,
     NestedField,
+    PrimitiveType,
     StringType,
 )
 
-SIMPLE_SCHEMA = Schema(
-    NestedField(id=1, name="id", field_type=LongType()), NestedField(id=2, 
name="data", field_type=StringType(), required=False)
-)
+INT_MIN_VALUE = 30
+INT_MAX_VALUE = 79
+
+
+def _to_byte_buffer(field_type: IcebergType, val: Any) -> bytes:
+    if not isinstance(field_type, PrimitiveType):
+        raise ValueError(f"Expected a PrimitiveType, got: {type(field_type)}")
+    return to_bytes(field_type, val)
+
+
+INT_MIN = _to_byte_buffer(IntegerType(), INT_MIN_VALUE)
+INT_MAX = _to_byte_buffer(IntegerType(), INT_MAX_VALUE)
+
+STRING_MIN = _to_byte_buffer(StringType(), "a")
+STRING_MAX = _to_byte_buffer(StringType(), "z")
+
+
[email protected]
+def schema_data_file() -> Schema:
+    return Schema(
+        NestedField(1, "id", IntegerType(), required=True),
+        NestedField(2, "no_stats", IntegerType(), required=False),
+        NestedField(3, "required", StringType(), required=True),
+        NestedField(4, "all_nulls", StringType(), required=False),
+        NestedField(5, "some_nulls", StringType(), required=False),
+        NestedField(6, "no_nulls", StringType(), required=False),
+        NestedField(7, "all_nans", DoubleType(), required=False),
+        NestedField(8, "some_nans", FloatType(), required=False),
+        NestedField(9, "no_nans", FloatType(), required=False),
+        NestedField(10, "all_nulls_double", DoubleType(), required=False),
+        NestedField(11, "all_nans_v1_stats", FloatType(), required=False),
+        NestedField(12, "nan_and_null_only", DoubleType(), required=False),
+        NestedField(13, "no_nan_stats", DoubleType(), required=False),
+        NestedField(14, "some_empty", StringType(), required=False),
+    )
+
+
[email protected]
+def data_file() -> DataFile:
+    return DataFile(
+        file_path="file_1.parquet",
+        file_format=FileFormat.PARQUET,
+        partition={},
+        record_count=50,
+        file_size_in_bytes=3,
+        value_counts={
+            1: 50,
+            2: 50,
+            3: 50,
+            4: 50,
+            5: 50,
+            6: 50,
+            7: 50,
+            8: 50,
+            9: 50,
+            10: 50,
+            11: 50,
+            12: 50,
+            13: 50,
+            14: 50,
+        },
+        null_value_counts={4: 50, 5: 10, 6: 0, 10: 50, 11: 0, 12: 1, 14: 8},
+        nan_value_counts={
+            7: 50,
+            8: 10,
+            9: 0,
+        },
+        lower_bounds={
+            1: to_bytes(IntegerType(), INT_MIN_VALUE),
+            11: to_bytes(FloatType(), float("nan")),
+            12: to_bytes(DoubleType(), float("nan")),
+            14: to_bytes(StringType(), ""),
+        },
+        upper_bounds={
+            1: to_bytes(IntegerType(), INT_MAX_VALUE),
+            11: to_bytes(FloatType(), float("nan")),
+            12: to_bytes(DoubleType(), float("nan")),
+            14: to_bytes(StringType(), "房东整租霍营小区二层两居室"),
+        },
+    )
 
-FLOAT_SCHEMA = Schema(
-    NestedField(id=1, name="id", field_type=LongType()), NestedField(id=2, 
name="f", field_type=DoubleType(), required=False)
-)
 
+def test_all_null(schema_data_file: Schema, data_file: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotNull("all_nulls")).eval(data_file)
+    assert not should_read, "Should skip: no non-null value in all null column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
LessThan("all_nulls", "a")).eval(data_file)
+    assert not should_read, "Should skip: lessThan on all null column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
LessThanOrEqual("all_nulls", "a")).eval(data_file)
+    assert not should_read, "Should skip: lessThanOrEqual on all null column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
GreaterThan("all_nulls", "a")).eval(data_file)
+    assert not should_read, "Should skip: greaterThan on all null column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
GreaterThanOrEqual("all_nulls", "a")).eval(data_file)
+    assert not should_read, "Should skip: greaterThanOrEqual on all null 
column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
EqualTo("all_nulls", "a")).eval(data_file)
+    assert not should_read, "Should skip: equal on all null column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotNull("some_nulls")).eval(data_file)
+    assert should_read, "Should read: column with some nulls contains a 
non-null value"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotNull("no_nulls")).eval(data_file)
+    assert should_read, "Should read: non-null column contains a non-null 
value"
+
+
+def test_no_nulls(schema_data_file: Schema, data_file: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
IsNull("all_nulls")).eval(data_file)
+    assert should_read, "Should read: at least one null value in all null 
column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
IsNull("some_nulls")).eval(data_file)
+    assert should_read, "Should read: column with some nulls contains a null 
value"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
IsNull("no_nulls")).eval(data_file)
+    assert not should_read, "Should skip: non-null column contains no null 
values"
+
+
+def test_is_nan(schema_data_file: Schema, data_file: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
IsNaN("all_nans")).eval(data_file)
+    assert should_read, "Should read: at least one nan value in all nan column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
IsNaN("some_nans")).eval(data_file)
+    assert should_read, "Should read: at least one nan value in some nan 
column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
IsNaN("no_nans")).eval(data_file)
+    assert not should_read, "Should skip: no-nans column contains no nan 
values"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
IsNaN("all_nulls_double")).eval(data_file)
+    assert not should_read, "Should skip: all-null column doesn't contain nan 
value"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
IsNaN("no_nan_stats")).eval(data_file)
+    assert should_read, "Should read: no guarantee on if contains nan value 
without nan stats"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
IsNaN("all_nans_v1_stats")).eval(data_file)
+    assert should_read, "Should read: at least one nan value in all nan column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
IsNaN("nan_and_null_only")).eval(data_file)
+    assert should_read, "Should read: at least one nan value in nan and nulls 
only column"
+
+
+def test_not_nan(schema_data_file: Schema, data_file: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotNaN("all_nans")).eval(data_file)
+    assert not should_read, "Should skip: column with all nans will not 
contain non-nan"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotNaN("some_nans")).eval(data_file)
+    assert should_read, "Should read: at least one non-nan value in some nan 
column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotNaN("no_nans")).eval(data_file)
+    assert should_read, "Should read: at least one non-nan value in no nan 
column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotNaN("all_nulls_double")).eval(data_file)
+    assert should_read, "Should read: at least one non-nan value in all null 
column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotNaN("no_nan_stats")).eval(data_file)
+    assert should_read, "Should read: no guarantee on if contains nan value 
without nan stats"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotNaN("all_nans_v1_stats")).eval(data_file)
+    assert should_read, "Should read: no guarantee on if contains nan value 
without nan stats"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotNaN("nan_and_null_only")).eval(data_file)
+    assert should_read, "Should read: at least one null value in nan and nulls 
only column"
+
+
+def test_required_column(schema_data_file: Schema, data_file: DataFile) -> 
None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotNull("required")).eval(data_file)
+    assert should_read, "Should read: required columns are always non-null"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
IsNull("required")).eval(data_file)
+    assert not should_read, "Should skip: required columns are always non-null"
+
+
+def test_missing_column(schema_data_file: Schema, data_file: DataFile) -> None:
+    with pytest.raises(ValueError) as exc_info:
+        _ = _InclusiveMetricsEvaluator(schema_data_file, LessThan("missing", 
22)).eval(data_file)
+
+    assert str(exc_info.value) == "Could not find field with name missing, 
case_sensitive=True"
+
+
+def test_missing_stats() -> None:
+    no_stats_schema = Schema(
+        NestedField(2, "no_stats", DoubleType(), required=False),
+    )
+
+    no_stats_file = DataFile(
+        file_path="file_1.parquet",
+        file_format=FileFormat.PARQUET,
+        partition={},
+        record_count=50,
+        value_counts=None,
+        null_value_counts=None,
+        nan_value_counts=None,
+        lower_bounds=None,
+        upper_bounds=None,
+    )
+
+    expressions = [
+        LessThan("no_stats", 5),
+        LessThanOrEqual("no_stats", 30),
+        EqualTo("no_stats", 70),
+        GreaterThan("no_stats", 78),
+        GreaterThanOrEqual("no_stats", 90),
+        NotEqualTo("no_stats", 101),
+        IsNull("no_stats"),
+        NotNull("no_stats"),
+        IsNaN("no_stats"),
+        NotNaN("no_stats"),
+    ]
+
+    for expression in expressions:
+        should_read = _InclusiveMetricsEvaluator(no_stats_schema, 
expression).eval(no_stats_file)
+        assert should_read, f"Should read when stats are missing for: 
{expression}"
+
+
+def test_zero_record_file_stats(schema_data_file: Schema) -> None:
+    zero_record_data_file = DataFile(file_path="file_1.parquet", 
file_format=FileFormat.PARQUET, partition={}, record_count=0)
+
+    expressions = [
+        LessThan("no_stats", 5),
+        LessThanOrEqual("no_stats", 30),
+        EqualTo("no_stats", 70),
+        GreaterThan("no_stats", 78),
+        GreaterThanOrEqual("no_stats", 90),
+        NotEqualTo("no_stats", 101),
+        IsNull("no_stats"),
+        NotNull("no_stats"),
+        IsNaN("no_stats"),
+        NotNaN("no_stats"),
+    ]
+
+    for expression in expressions:
+        should_read = _InclusiveMetricsEvaluator(schema_data_file, 
expression).eval(zero_record_data_file)
+        assert not should_read, f"Should skip a datafile without records: 
{expression}"
+
+
+def test_not(schema_data_file: Schema, data_file: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
Not(LessThan("id", INT_MIN_VALUE - 25))).eval(data_file)
+    assert should_read, "Should read: not(false)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
Not(GreaterThan("id", INT_MIN_VALUE - 25))).eval(data_file)
+    assert not should_read, "Should skip: not(true)"
+
+
+def test_and(schema_data_file: Schema, data_file: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(
+        schema_data_file, And(LessThan("id", INT_MIN_VALUE - 25), 
GreaterThanOrEqual("id", INT_MIN_VALUE - 30))
+    ).eval(data_file)
+    assert not should_read, "Should skip: and(false, true)"
+
+    should_read = _InclusiveMetricsEvaluator(
+        schema_data_file, And(LessThan("id", INT_MIN_VALUE - 25), 
GreaterThanOrEqual("id", INT_MIN_VALUE + 1))
+    ).eval(data_file)
+    assert not should_read, "Should skip: and(false, false)"
+
+    should_read = _InclusiveMetricsEvaluator(
+        schema_data_file, And(GreaterThan("id", INT_MIN_VALUE - 25), 
LessThanOrEqual("id", INT_MIN_VALUE))
+    ).eval(data_file)
+    assert should_read, "Should read: and(true, true)"
+
+
+def test_or(schema_data_file: Schema, data_file: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(
+        schema_data_file, Or(LessThan("id", INT_MIN_VALUE - 25), 
GreaterThanOrEqual("id", INT_MAX_VALUE + 1))
+    ).eval(data_file)
+    assert not should_read, "Should skip: or(false, false)"
+
+    should_read = _InclusiveMetricsEvaluator(
+        schema_data_file, Or(LessThan("id", INT_MIN_VALUE - 25), 
GreaterThanOrEqual("id", INT_MAX_VALUE - 19))
+    ).eval(data_file)
+    assert should_read, "Should read: or(false, true)"
+
+
+def test_integer_lt(schema_data_file: Schema, data_file: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, LessThan("id", 
INT_MIN_VALUE - 25)).eval(data_file)
+    assert not should_read, "Should not read: id range below lower bound (5 < 
30)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, LessThan("id", 
INT_MIN_VALUE)).eval(data_file)
+    assert not should_read, "Should not read: id range below lower bound (30 
is not < 30)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, LessThan("id", 
INT_MIN_VALUE + 1)).eval(data_file)
+    assert should_read, "Should read: one possible id"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, LessThan("id", 
INT_MAX_VALUE)).eval(data_file)
+    assert should_read, "Should read: may possible ids"
+
+
+def test_integer_lt_eq(schema_data_file: Schema, data_file: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
LessThanOrEqual("id", INT_MIN_VALUE - 25)).eval(data_file)
+    assert not should_read, "Should not read: id range below lower bound (5 < 
30)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
LessThanOrEqual("id", INT_MIN_VALUE - 1)).eval(data_file)
+    assert not should_read, "Should not read: id range below lower bound (30 
is not < 30)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
LessThanOrEqual("id", INT_MIN_VALUE)).eval(data_file)
+    assert should_read, "Should read: one possible id"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
LessThanOrEqual("id", INT_MAX_VALUE)).eval(data_file)
+    assert should_read, "Should read: may possible ids"
+
+
+def test_integer_gt(schema_data_file: Schema, data_file: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
GreaterThan("id", INT_MAX_VALUE + 6)).eval(data_file)
+    assert not should_read, "Should not read: id range above upper bound (85 < 
79)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
GreaterThan("id", INT_MAX_VALUE)).eval(data_file)
+    assert not should_read, "Should not read: id range above upper bound (79 
is not > 79)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
GreaterThan("id", INT_MIN_VALUE - 1)).eval(data_file)
+    assert should_read, "Should read: one possible id"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
GreaterThan("id", INT_MAX_VALUE - 4)).eval(data_file)
+    assert should_read, "Should read: may possible ids"
+
+
+def test_integer_gt_eq(schema_data_file: Schema, data_file: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
GreaterThanOrEqual("id", INT_MAX_VALUE + 6)).eval(data_file)
+    assert not should_read, "Should not read: id range above upper bound (85 < 
79)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
GreaterThanOrEqual("id", INT_MAX_VALUE + 1)).eval(data_file)
+    assert not should_read, "Should not read: id range above upper bound (80 > 
79)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
GreaterThanOrEqual("id", INT_MAX_VALUE)).eval(data_file)
+    assert should_read, "Should read: one possible id"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
GreaterThanOrEqual("id", INT_MAX_VALUE - 4)).eval(data_file)
+    assert should_read, "Should read: may possible ids"
+
+
+def test_integer_eq(schema_data_file: Schema, data_file: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("id", 
INT_MIN_VALUE - 25)).eval(data_file)
+    assert not should_read, "Should not read: id below lower bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("id", 
INT_MIN_VALUE - 1)).eval(data_file)
+    assert not should_read, "Should not read: id below lower bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("id", 
INT_MIN_VALUE)).eval(data_file)
+    assert should_read, "Should read: id equal to lower bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("id", 
INT_MAX_VALUE - 4)).eval(data_file)
+    assert should_read, "Should read: id between lower and upper bounds"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("id", 
INT_MAX_VALUE)).eval(data_file)
+    assert should_read, "Should read: id equal to upper bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("id", 
INT_MAX_VALUE + 1)).eval(data_file)
+    assert not should_read, "Should not read: id above upper bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("id", 
INT_MAX_VALUE + 6)).eval(data_file)
+    assert not should_read, "Should not read: id above upper bound"
+
+
+def test_integer_not_eq(schema_data_file: Schema, data_file: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotEqualTo("id", INT_MIN_VALUE - 25)).eval(data_file)
+    assert should_read, "Should read: id below lower bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotEqualTo("id", INT_MIN_VALUE - 1)).eval(data_file)
+    assert should_read, "Should read: id below lower bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotEqualTo("id", INT_MIN_VALUE)).eval(data_file)
+    assert should_read, "Should read: id equal to lower bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotEqualTo("id", INT_MAX_VALUE - 4)).eval(data_file)
+    assert should_read, "Should read: id between lower and upper bounds"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotEqualTo("id", INT_MAX_VALUE)).eval(data_file)
+    assert should_read, "Should read: id equal to upper bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotEqualTo("id", INT_MAX_VALUE + 1)).eval(data_file)
+    assert should_read, "Should read: id above upper bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotEqualTo("id", INT_MAX_VALUE + 6)).eval(data_file)
+    assert should_read, "Should read: id above upper bound"
+
+
+def test_integer_not_eq_rewritten(schema_data_file: Schema, data_file: 
DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
Not(EqualTo("id", INT_MIN_VALUE - 25))).eval(data_file)
+    assert should_read, "Should read: id below lower bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
Not(EqualTo("id", INT_MIN_VALUE - 1))).eval(data_file)
+    assert should_read, "Should read: id below lower bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
Not(EqualTo("id", INT_MIN_VALUE))).eval(data_file)
+    assert should_read, "Should read: id equal to lower bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
Not(EqualTo("id", INT_MAX_VALUE - 4))).eval(data_file)
+    assert should_read, "Should read: id between lower and upper bounds"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
Not(EqualTo("id", INT_MAX_VALUE))).eval(data_file)
+    assert should_read, "Should read: id equal to upper bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
Not(EqualTo("id", INT_MAX_VALUE + 1))).eval(data_file)
+    assert should_read, "Should read: id above upper bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
Not(EqualTo("id", INT_MAX_VALUE + 6))).eval(data_file)
+    assert should_read, "Should read: id above upper bound"
+
+
+def test_integer_case_insensitive_not_eq_rewritten(schema_data_file: Schema, 
data_file: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
Not(EqualTo("ID", INT_MIN_VALUE - 25)), case_sensitive=False).eval(
+        data_file
+    )
+    assert should_read, "Should read: id below lower bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
Not(EqualTo("ID", INT_MIN_VALUE - 1)), case_sensitive=False).eval(
+        data_file
+    )
+    assert should_read, "Should read: id below lower bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
Not(EqualTo("ID", INT_MIN_VALUE)), case_sensitive=False).eval(
+        data_file
+    )
+    assert should_read, "Should read: id equal to lower bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
Not(EqualTo("ID", INT_MAX_VALUE - 4)), case_sensitive=False).eval(
+        data_file
+    )
+    assert should_read, "Should read: id between lower and upper bounds"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
Not(EqualTo("ID", INT_MAX_VALUE)), case_sensitive=False).eval(
+        data_file
+    )
+    assert should_read, "Should read: id equal to upper bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
Not(EqualTo("ID", INT_MAX_VALUE + 1)), case_sensitive=False).eval(
+        data_file
+    )
+    assert should_read, "Should read: id above upper bound"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
Not(EqualTo("ID", INT_MAX_VALUE + 6)), case_sensitive=False).eval(
+        data_file
+    )
+    assert should_read, "Should read: id above upper bound"
+
+
+def test_missing_column_case_sensitive(schema_data_file: Schema, data_file: 
DataFile) -> None:
+    with pytest.raises(ValueError) as exc_info:
+        _ = _InclusiveMetricsEvaluator(schema_data_file, LessThan("ID", 22), 
case_sensitive=True).eval(data_file)
+
+    assert str(exc_info.value) == "Could not find field with name ID, 
case_sensitive=True"
+
+
+def test_integer_in(schema_data_file: Schema, data_file: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, In("id", 
{INT_MIN_VALUE - 25, INT_MIN_VALUE - 24})).eval(data_file)
+    assert not should_read, "Should not read: id below lower bound (5 < 30, 6 
< 30)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, In("id", 
{INT_MIN_VALUE - 2, INT_MIN_VALUE - 1})).eval(data_file)
+    assert not should_read, "Should not read: id below lower bound (28 < 30, 
29 < 30)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, In("id", 
{INT_MIN_VALUE - 1, INT_MIN_VALUE})).eval(data_file)
+    assert should_read, "Should read: id equal to lower bound (30 == 30)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, In("id", 
{INT_MAX_VALUE - 4, INT_MAX_VALUE - 3})).eval(data_file)
+    assert should_read, "Should read: id between lower and upper bounds (30 < 
75 < 79, 30 < 76 < 79)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, In("id", 
{INT_MAX_VALUE, INT_MAX_VALUE + 1})).eval(data_file)
+    assert should_read, "Should read: id equal to upper bound (79 == 79)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, In("id", 
{INT_MAX_VALUE + 1, INT_MAX_VALUE + 2})).eval(data_file)
+    assert not should_read, "Should not read: id above upper bound (80 > 79, 
81 > 79)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, In("id", 
{INT_MAX_VALUE + 6, INT_MAX_VALUE + 7})).eval(data_file)
+    assert not should_read, "Should not read: id above upper bound (85 > 79, 
86 > 79)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, In("all_nulls", 
{"abc", "def"})).eval(data_file)
+    assert not should_read, "Should skip: in on all nulls column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
In("some_nulls", {"abc", "def"})).eval(data_file)
+    assert should_read, "Should read: in on some nulls column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, In("no_nulls", 
{"abc", "def"})).eval(data_file)
+    assert should_read, "Should read: in on no nulls column"
+
+    ids = list(range(400))
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, In("id", 
ids)).eval(data_file)
+    assert should_read, "Should read: large in expression"
+
+
+def test_integer_not_in(schema_data_file: Schema, data_file: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, NotIn("id", 
{INT_MIN_VALUE - 25, INT_MIN_VALUE - 24})).eval(
+        data_file
+    )
+    assert should_read, "Should read: id below lower bound (5 < 30, 6 < 30)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, NotIn("id", 
{INT_MIN_VALUE - 2, INT_MIN_VALUE - 1})).eval(
+        data_file
+    )
+    assert should_read, "Should read: id below lower bound (28 < 30, 29 < 30)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, NotIn("id", 
{INT_MIN_VALUE - 1, INT_MIN_VALUE})).eval(data_file)
+    assert should_read, "Should read: id equal to lower bound (30 == 30)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, NotIn("id", 
{INT_MAX_VALUE - 4, INT_MAX_VALUE - 3})).eval(
+        data_file
+    )
+    assert should_read, "Should read: id between lower and upper bounds (30 < 
75 < 79, 30 < 76 < 79)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, NotIn("id", 
{INT_MAX_VALUE, INT_MAX_VALUE + 1})).eval(data_file)
+    assert should_read, "Should read: id equal to upper bound (79 == 79)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, NotIn("id", 
{INT_MAX_VALUE + 1, INT_MAX_VALUE + 2})).eval(
+        data_file
+    )
+    assert should_read, "Should read: id above upper bound (80 > 79, 81 > 79)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, NotIn("id", 
{INT_MAX_VALUE + 6, INT_MAX_VALUE + 7})).eval(
+        data_file
+    )
+    assert should_read, "Should read: id above upper bound (85 > 79, 86 > 79)"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotIn("all_nulls", {"abc", "def"})).eval(data_file)
+    assert should_read, "Should read: notIn on all nulls column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotIn("some_nulls", {"abc", "def"})).eval(data_file)
+    assert should_read, "Should read: in on some nulls column"
+
+    should_read = _InclusiveMetricsEvaluator(schema_data_file, 
NotIn("no_nulls", {"abc", "def"})).eval(data_file)
+    assert should_read, "Should read: in on no nulls column"
+
+
[email protected]
+def schema_data_file_nan() -> Schema:
+    return Schema(
+        NestedField(1, "all_nan", DoubleType(), required=True),
+        NestedField(2, "max_nan", DoubleType(), required=True),
+        NestedField(3, "min_max_nan", FloatType(), required=False),
+        NestedField(4, "all_nan_null_bounds", DoubleType(), required=True),
+        NestedField(5, "some_nan_correct_bounds", FloatType(), required=False),
+    )
+
+
[email protected]
+def data_file_nan() -> DataFile:
+    return DataFile(
+        file_path="file.avro",
+        file_format=FileFormat.PARQUET,
+        partition={},
+        record_count=50,
+        file_size_in_bytes=3,
+        column_sizes={
+            1: 10,
+            2: 10,
+            3: 10,
+            4: 10,
+            5: 10,
+        },
+        value_counts={
+            1: 10,
+            2: 10,
+            3: 10,
+            4: 10,
+            5: 10,
+        },
+        null_value_counts={
+            1: 0,
+            2: 0,
+            3: 0,
+            4: 0,
+            5: 0,
+        },
+        nan_value_counts={1: 10, 4: 10, 5: 5},
+        lower_bounds={
+            1: to_bytes(DoubleType(), float("nan")),
+            2: to_bytes(DoubleType(), 7),
+            3: to_bytes(FloatType(), float("nan")),
+            5: to_bytes(FloatType(), 7),
+        },
+        upper_bounds={
+            1: to_bytes(DoubleType(), float("nan")),
+            2: to_bytes(DoubleType(), float("nan")),
+            3: to_bytes(FloatType(), float("nan")),
+            5: to_bytes(FloatType(), 22),
+        },
+    )
+
+
+def 
test_inclusive_metrics_evaluator_less_than_and_less_than_equal(schema_data_file_nan:
 Schema, data_file_nan: DataFile) -> None:
+    for operator in [LessThan, LessThanOrEqual]:
+        should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
operator("all_nan", 1)).eval(data_file_nan)
+        assert not should_read, "Should not match: all nan column doesn't 
contain number"
+
+        should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
operator("max_nan", 1)).eval(data_file_nan)
+        assert not should_read, "Should not match: 1 is smaller than lower 
bound"
+
+        should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
operator("max_nan", 10)).eval(data_file_nan)
+        assert should_read, "Should match: 10 is larger than lower bound"
+
+        should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
operator("min_max_nan", 1)).eval(data_file_nan)
+        assert should_read, "Should match: no visibility"
+
+        should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
operator("all_nan_null_bounds", 1)).eval(data_file_nan)
+        assert not should_read, "Should not match: all nan column doesn't 
contain number"
+
+        should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
operator("some_nan_correct_bounds", 1)).eval(data_file_nan)
+        assert not should_read, "Should not match: 1 is smaller than lower 
bound"
+
+        should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
operator("some_nan_correct_bounds", 10)).eval(
+            data_file_nan
+        )
+        assert should_read, "Should match: 10 larger than lower bound"
+
+
+def test_inclusive_metrics_evaluator_greater_than_and_greater_than_equal(
+    schema_data_file_nan: Schema, data_file_nan: DataFile
+) -> None:
+    for operator in [GreaterThan, GreaterThanOrEqual]:
+        should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
operator("all_nan", 1)).eval(data_file_nan)
+        assert not should_read, "Should not match: all nan column doesn't 
contain number"
+
+        should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
operator("max_nan", 1)).eval(data_file_nan)
+        assert should_read, "Should match: upper bound is larger than 1"
+
+        should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
operator("max_nan", 10)).eval(data_file_nan)
+        assert should_read, "Should match: upper bound is larger than 10"
+
+        should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
operator("min_max_nan", 1)).eval(data_file_nan)
+        assert should_read, "Should match: no visibility"
+
+        should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
operator("all_nan_null_bounds", 1)).eval(data_file_nan)
+        assert not should_read, "Should not match: all nan column doesn't 
contain number"
+
+        should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
operator("some_nan_correct_bounds", 1)).eval(data_file_nan)
+        assert should_read, "Should match: 1 is smaller than upper bound"
 
-def _record_simple(id: int, data: Optional[str]) -> Record:  # pylint: 
disable=redefined-builtin
-    r = Record(struct=SIMPLE_SCHEMA.as_struct())
-    r[0] = id
-    r[1] = data
-    return r
+        should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
operator("some_nan_correct_bounds", 10)).eval(
+            data_file_nan
+        )
+        assert should_read, "Should match: 10 is smaller than upper bound"
 
+        should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
operator("all_nan", 30)).eval(data_file_nan)
+        assert not should_read, "Should not match: 30 is greater than upper 
bound"
 
-def _record_float(id: float, f: float) -> Record:  # pylint: 
disable=redefined-builtin
-    r = Record(struct=FLOAT_SCHEMA.as_struct())
-    r[0] = id
-    r[1] = f
-    return r
 
+def test_inclusive_metrics_evaluator_equals(schema_data_file_nan: Schema, 
data_file_nan: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
EqualTo("all_nan", 1)).eval(data_file_nan)
+    assert not should_read, "Should not match: all nan column doesn't contain 
number"
 
-def test_true() -> None:
-    evaluate = expression_evaluator(SIMPLE_SCHEMA, AlwaysTrue(), 
case_sensitive=True)
-    assert evaluate(Record(1, "a"))
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
EqualTo("max_nan", 1)).eval(data_file_nan)
+    assert not should_read, "Should not match: 1 is smaller than lower bound"
 
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
EqualTo("max_nan", 10)).eval(data_file_nan)
+    assert should_read, "Should match: 10 is within bounds"
 
-def test_false() -> None:
-    evaluate = expression_evaluator(SIMPLE_SCHEMA, AlwaysFalse(), 
case_sensitive=True)
-    assert not evaluate(Record(1, "a"))
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
EqualTo("min_max_nan", 1)).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"
 
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
EqualTo("all_nan_null_bounds", 1)).eval(data_file_nan)
+    assert not should_read, "Should not match: all nan column doesn't contain 
number"
 
-def test_less_than() -> None:
-    evaluate = expression_evaluator(SIMPLE_SCHEMA, LessThan("id", 3), 
case_sensitive=True)
-    assert evaluate(Record(2, "a"))
-    assert not evaluate(Record(3, "a"))
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
EqualTo("some_nan_correct_bounds", 1)).eval(data_file_nan)
+    assert not should_read, "Should not match: 1 is smaller than lower bound"
 
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
EqualTo("some_nan_correct_bounds", 10)).eval(data_file_nan)
+    assert should_read, "Should match: 10 is within bounds"
 
-def test_less_than_or_equal() -> None:
-    evaluate = expression_evaluator(SIMPLE_SCHEMA, LessThanOrEqual("id", 3), 
case_sensitive=True)
-    assert evaluate(Record(1, "a"))
-    assert evaluate(Record(3, "a"))
-    assert not evaluate(Record(4, "a"))
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
EqualTo("all_nan", 30)).eval(data_file_nan)
+    assert not should_read, "Should not match: 30 is greater than upper bound"
 
 
-def test_greater_than() -> None:
-    evaluate = expression_evaluator(SIMPLE_SCHEMA, GreaterThan("id", 3), 
case_sensitive=True)
-    assert not evaluate(Record(1, "a"))
-    assert not evaluate(Record(3, "a"))
-    assert evaluate(Record(4, "a"))
+def test_inclusive_metrics_evaluator_not_equals(schema_data_file_nan: Schema, 
data_file_nan: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
NotEqualTo("all_nan", 1)).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"
 
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
NotEqualTo("max_nan", 10)).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"
 
-def test_greater_than_or_equal() -> None:
-    evaluate = expression_evaluator(SIMPLE_SCHEMA, GreaterThanOrEqual("id", 
3), case_sensitive=True)
-    assert not evaluate(Record(2, "a"))
-    assert evaluate(Record(3, "a"))
-    assert evaluate(Record(4, "a"))
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
NotEqualTo("max_nan", 10)).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"
 
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
NotEqualTo("min_max_nan", 1)).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"
 
-def test_equal_to() -> None:
-    evaluate = expression_evaluator(SIMPLE_SCHEMA, EqualTo("id", 3), 
case_sensitive=True)
-    assert not evaluate(Record(2, "a"))
-    assert evaluate(Record(3, "a"))
-    assert not evaluate(Record(4, "a"))
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
NotEqualTo("all_nan_null_bounds", 1)).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"
 
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
NotEqualTo("some_nan_correct_bounds", 1)).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"
 
-def test_not_equal_to() -> None:
-    evaluate = expression_evaluator(SIMPLE_SCHEMA, NotEqualTo("id", 3), 
case_sensitive=True)
-    assert evaluate(Record(2, "a"))
-    assert not evaluate(Record(3, "a"))
-    assert evaluate(Record(4, "a"))
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
NotEqualTo("some_nan_correct_bounds", 10)).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"
 
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
NotEqualTo("some_nan_correct_bounds", 30)).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"
 
-def test_in() -> None:
-    evaluate = expression_evaluator(SIMPLE_SCHEMA, In("id", [1, 2, 3]), 
case_sensitive=True)
-    assert evaluate(Record(2, "a"))
-    assert evaluate(Record(3, "a"))
-    assert not evaluate(Record(4, "a"))
 
+def test_inclusive_metrics_evaluator_in(schema_data_file_nan: Schema, 
data_file_nan: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
In("all_nan", (1, 10, 30))).eval(data_file_nan)
+    assert not should_read, "Should not match: all nan column doesn't contain 
number"
 
-def test_not_in() -> None:
-    evaluate = expression_evaluator(SIMPLE_SCHEMA, NotIn("id", [1, 2, 3]), 
case_sensitive=True)
-    assert not evaluate(Record(2, "a"))
-    assert not evaluate(Record(3, "a"))
-    assert evaluate(Record(4, "a"))
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
In("max_nan", (1, 10, 30))).eval(data_file_nan)
+    assert should_read, "Should match: 10 and 30 are greater than lower bound"
 
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
In("min_max_nan", (1, 10, 30))).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"
 
-def test_is_null() -> None:
-    evaluate = expression_evaluator(SIMPLE_SCHEMA, IsNull("data"), 
case_sensitive=True)
-    assert not evaluate(Record(2, "a"))
-    assert evaluate(Record(3, None))
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
In("all_nan_null_bounds", (1, 10, 30))).eval(data_file_nan)
+    assert not should_read, "Should not match: all nan column doesn't contain 
number"
 
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
In("some_nan_correct_bounds", (1, 10, 30))).eval(data_file_nan)
+    assert should_read, "Should match: 10 within bounds"
 
-def test_not_null() -> None:
-    evaluate = expression_evaluator(SIMPLE_SCHEMA, NotNull("data"), 
case_sensitive=True)
-    assert evaluate(Record(2, "a"))
-    assert not evaluate(Record(3, None))
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
In("some_nan_correct_bounds", (1, 30))).eval(data_file_nan)
+    assert not should_read, "Should not match: 1 and 30 not within bounds"
 
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
In("some_nan_correct_bounds", (5, 7))).eval(data_file_nan)
+    assert should_read, "Should match: overlap with lower bound"
 
-def test_is_nan() -> None:
-    evaluate = expression_evaluator(FLOAT_SCHEMA, IsNaN("f"), 
case_sensitive=True)
-    assert not evaluate(_record_float(2, f=0.0))
-    assert not evaluate(_record_float(3, f=float("infinity")))
-    assert evaluate(_record_float(4, f=float("nan")))
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
In("some_nan_correct_bounds", (22, 25))).eval(data_file_nan)
+    assert should_read, "Should match: overlap with upper bounds"
 
 
-def test_not_nan() -> None:
-    evaluate = expression_evaluator(FLOAT_SCHEMA, NotNaN("f"), 
case_sensitive=True)
-    assert evaluate(_record_float(2, f=0.0))
-    assert evaluate(_record_float(3, f=float("infinity")))
-    assert not evaluate(_record_float(4, f=float("nan")))
+def test_inclusive_metrics_evaluator_not_in(schema_data_file_nan: Schema, 
data_file_nan: DataFile) -> None:
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
NotIn("all_nan", (1, 10, 30))).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"
 
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
NotIn("max_nan", (1, 10, 30))).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"
 
-def test_not() -> None:
-    evaluate = expression_evaluator(SIMPLE_SCHEMA, Not(LessThan("id", 3)), 
case_sensitive=True)
-    assert not evaluate(Record(2, "a"))
-    assert evaluate(Record(3, "a"))
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
NotIn("max_nan", (1, 10, 30))).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"
 
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
NotIn("min_max_nan", (1, 10, 30))).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"
 
-def test_and() -> None:
-    evaluate = expression_evaluator(SIMPLE_SCHEMA, And(LessThan("id", 3), 
GreaterThan("id", 1)), case_sensitive=True)
-    assert not evaluate(Record(1, "a"))
-    assert evaluate(Record(2, "a"))
-    assert not evaluate(Record(3, "a"))
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
NotIn("all_nan_null_bounds", (1, 10, 30))).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"
 
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
NotIn("some_nan_correct_bounds", (1, 30))).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"
 
-def test_or() -> None:
-    evaluate = expression_evaluator(SIMPLE_SCHEMA, Or(LessThan("id", 2), 
GreaterThan("id", 2)), case_sensitive=True)
-    assert evaluate(Record(1, "a"))
-    assert not evaluate(Record(2, "a"))
-    assert evaluate(Record(3, "a"))
+    should_read = _InclusiveMetricsEvaluator(schema_data_file_nan, 
NotIn("some_nan_correct_bounds", (1, 30))).eval(data_file_nan)
+    assert should_read, "Should match: no visibility"

Reply via email to