This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 36b7c933b4 Python: Filter on Datafile metrics (#6714)
36b7c933b4 is described below
commit 36b7c933b444bd3df2137e38eb275e12a8ea7afc
Author: Fokko Driesprong <[email protected]>
AuthorDate: Mon Feb 27 23:02:49 2023 +0100
Python: Filter on Datafile metrics (#6714)
---
python/pyiceberg/conversions.py | 3 +-
python/pyiceberg/expressions/literals.py | 3 +
python/pyiceberg/expressions/visitors.py | 274 +++++++++-
python/pyiceberg/io/pyarrow.py | 4 +-
python/pyiceberg/table/__init__.py | 23 +-
python/tests/expressions/test_evaluator.py | 803 +++++++++++++++++++++++++----
6 files changed, 1002 insertions(+), 108 deletions(-)
diff --git a/python/pyiceberg/conversions.py b/python/pyiceberg/conversions.py
index 4707dab85c..948ddcdcad 100644
--- a/python/pyiceberg/conversions.py
+++ b/python/pyiceberg/conversions.py
@@ -38,6 +38,7 @@ from typing import (
Union,
)
+from pyiceberg.typedef import L
from pyiceberg.types import (
BinaryType,
BooleanType,
@@ -241,7 +242,7 @@ def _(primitive_type: DecimalType, value: Decimal) -> bytes:
@singledispatch
-def from_bytes(primitive_type: PrimitiveType, b: bytes) -> Union[bool, bytes,
Decimal, float, int, str, uuid.UUID]:
+def from_bytes(primitive_type: PrimitiveType, b: bytes) -> L:
"""A generic function which converts bytes to a built-in python value
Args:
diff --git a/python/pyiceberg/expressions/literals.py
b/python/pyiceberg/expressions/literals.py
index 3e3233af0d..ba380cfd67 100644
--- a/python/pyiceberg/expressions/literals.py
+++ b/python/pyiceberg/expressions/literals.py
@@ -328,6 +328,9 @@ class FloatLiteral(Literal[float]):
def __ge__(self, other: Any) -> bool:
return self._value32 >= other
+ def __hash__(self) -> int:
+ return hash(self._value32)
+
@singledispatchmethod
def to(self, type_var: IcebergType) -> Literal: # type: ignore
raise TypeError(f"Cannot convert FloatLiteral into {type_var}")
diff --git a/python/pyiceberg/expressions/visitors.py
b/python/pyiceberg/expressions/visitors.py
index 81e0ebc559..4fc666bcd9 100644
--- a/python/pyiceberg/expressions/visitors.py
+++ b/python/pyiceberg/expressions/visitors.py
@@ -14,11 +14,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import math
from abc import ABC, abstractmethod
from functools import singledispatch
from typing import (
Any,
Callable,
+ Dict,
Generic,
List,
Set,
@@ -56,15 +58,16 @@ from pyiceberg.expressions import (
UnboundPredicate,
)
from pyiceberg.expressions.literals import Literal
-from pyiceberg.manifest import ManifestFile, PartitionFieldSummary
+from pyiceberg.manifest import DataFile, ManifestFile, PartitionFieldSummary
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
-from pyiceberg.typedef import StructProtocol
+from pyiceberg.typedef import EMPTY_DICT, StructProtocol
from pyiceberg.types import (
DoubleType,
FloatType,
IcebergType,
PrimitiveType,
+ StructType,
TimestampType,
TimestamptzType,
)
@@ -986,3 +989,270 @@ def expression_to_plain_format(
# In the form of expr1 ∨ expr2 ∨ ... ∨ exprN
visitor = ExpressionToPlainFormat(cast_int_to_datetime)
return [visit(expression, visitor) for expression in expressions]
+
+
+class _InclusiveMetricsEvaluator(BoundBooleanExpressionVisitor[bool]):
+ struct: StructType
+ expr: BooleanExpression
+
+ value_counts: Dict[int, int]
+ null_counts: Dict[int, int]
+ nan_counts: Dict[int, int]
+ lower_bounds: Dict[int, bytes]
+ upper_bounds: Dict[int, bytes]
+
+ def __init__(self, schema: Schema, expr: BooleanExpression,
case_sensitive: bool = True) -> None:
+ self.struct = schema.as_struct()
+ self.expr = bind(schema, rewrite_not(expr), case_sensitive)
+
+ def eval(self, file: DataFile) -> bool:
+ """Test whether the file may contain records that match the
expression."""
+
+ if file.record_count == 0:
+ return ROWS_CANNOT_MATCH
+
+ if file.record_count < 0:
+ # Older version don't correctly implement record count from avro
file and thus
+ # set record count -1 when importing avro tables to iceberg
tables. This should
+ # be updated once we implemented and set correct record count.
+ return ROWS_MIGHT_MATCH
+
+ self.value_counts = file.value_counts or EMPTY_DICT
+ self.null_counts = file.null_value_counts or EMPTY_DICT
+ self.nan_counts = file.nan_value_counts or EMPTY_DICT
+ self.lower_bounds = file.lower_bounds or EMPTY_DICT
+ self.upper_bounds = file.upper_bounds or EMPTY_DICT
+
+ return visit(self.expr, self)
+
+ def _contains_nulls_only(self, field_id: int) -> bool:
+ if (value_count := self.value_counts.get(field_id)) and (null_count :=
self.null_counts.get(field_id)):
+ return value_count == null_count
+ return False
+
+ def _contains_nans_only(self, field_id: int) -> bool:
+ if (nan_count := self.nan_counts.get(field_id)) and (value_count :=
self.value_counts.get(field_id)):
+ return nan_count == value_count
+ return False
+
+ def _is_nan(self, val: Any) -> bool:
+ try:
+ return math.isnan(val)
+ except TypeError:
+ # In the case of None or other non-numeric types
+ return False
+
+ def visit_true(self) -> bool:
+ # all rows match
+ return ROWS_MIGHT_MATCH
+
+ def visit_false(self) -> bool:
+ # all rows fail
+ return ROWS_CANNOT_MATCH
+
+ def visit_not(self, child_result: bool) -> bool:
+ raise ValueError(f"NOT should be rewritten: {child_result}")
+
+ def visit_and(self, left_result: bool, right_result: bool) -> bool:
+ return left_result and right_result
+
+ def visit_or(self, left_result: bool, right_result: bool) -> bool:
+ return left_result or right_result
+
+ def visit_is_null(self, term: BoundTerm[L]) -> bool:
+ field_id = term.ref().field.field_id
+
+ if self.null_counts.get(field_id) == 0:
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def visit_not_null(self, term: BoundTerm[L]) -> bool:
+ # no need to check whether the field is required because binding
evaluates that case
+ # if the column has no non-null values, the expression cannot match
+ field_id = term.ref().field.field_id
+
+ if self._contains_nulls_only(field_id):
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def visit_is_nan(self, term: BoundTerm[L]) -> bool:
+ field_id = term.ref().field.field_id
+
+ if self.nan_counts.get(field_id) == 0:
+ return ROWS_CANNOT_MATCH
+
+ # when there's no nanCounts information, but we already know the
column only contains null,
+ # it's guaranteed that there's no NaN value
+ if self._contains_nulls_only(field_id):
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def visit_not_nan(self, term: BoundTerm[L]) -> bool:
+ field_id = term.ref().field.field_id
+
+ if self._contains_nans_only(field_id):
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
+ field = term.ref().field
+ field_id = field.field_id
+
+ if self._contains_nulls_only(field_id) or
self._contains_nans_only(field_id):
+ return ROWS_CANNOT_MATCH
+
+ if not isinstance(field.field_type, PrimitiveType):
+ raise ValueError(f"Expected PrimitiveType: {field.field_type}")
+
+ if lower_bound_bytes := self.lower_bounds.get(field_id): # type:
ignore
+ lower_bound = from_bytes(field.field_type, lower_bound_bytes) #
type: ignore
+
+ if self._is_nan(lower_bound):
+ # NaN indicates unreliable bounds. See the
InclusiveMetricsEvaluator docs for more.
+ return ROWS_MIGHT_MATCH
+
+ if lower_bound >= literal.value:
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def visit_less_than_or_equal(self, term: BoundTerm[L], literal:
Literal[L]) -> bool:
+ field = term.ref().field
+ field_id = field.field_id
+
+ if self._contains_nulls_only(field_id) or
self._contains_nans_only(field_id):
+ return ROWS_CANNOT_MATCH
+
+ if not isinstance(field.field_type, PrimitiveType):
+ raise ValueError(f"Expected PrimitiveType: {field.field_type}")
+
+ if lower_bound_bytes := self.lower_bounds.get(field_id): # type:
ignore
+ lower_bound = from_bytes(field.field_type, lower_bound_bytes) #
type: ignore
+ if self._is_nan(lower_bound):
+ # NaN indicates unreliable bounds. See the
InclusiveMetricsEvaluator docs for more.
+ return ROWS_MIGHT_MATCH
+
+ if lower_bound > literal.value:
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) ->
bool:
+ field = term.ref().field
+ field_id = field.field_id
+
+ if self._contains_nulls_only(field_id) or
self._contains_nans_only(field_id):
+ return ROWS_CANNOT_MATCH
+
+ if not isinstance(field.field_type, PrimitiveType):
+ raise ValueError(f"Expected PrimitiveType: {field.field_type}")
+
+ if upper_bound_bytes := self.upper_bounds.get(field_id): # type:
ignore
+ upper_bound = from_bytes(field.field_type, upper_bound_bytes) #
type: ignore
+ if upper_bound <= literal.value:
+ if self._is_nan(upper_bound):
+ # NaN indicates unreliable bounds. See the
InclusiveMetricsEvaluator docs for more.
+ return ROWS_MIGHT_MATCH
+
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def visit_greater_than_or_equal(self, term: BoundTerm[L], literal:
Literal[L]) -> bool:
+ field = term.ref().field
+ field_id = field.field_id
+
+ if self._contains_nulls_only(field_id) or
self._contains_nans_only(field_id):
+ return ROWS_CANNOT_MATCH
+
+ if not isinstance(field.field_type, PrimitiveType):
+ raise ValueError(f"Expected PrimitiveType: {field.field_type}")
+
+ if upper_bound_bytes := self.upper_bounds.get(field_id): # type:
ignore
+ upper_bound = from_bytes(field.field_type, upper_bound_bytes) #
type: ignore
+ if upper_bound < literal.value:
+ if self._is_nan(upper_bound):
+ # NaN indicates unreliable bounds. See the
InclusiveMetricsEvaluator docs for more.
+ return ROWS_MIGHT_MATCH
+
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
+ field = term.ref().field
+ field_id = field.field_id
+
+ if self._contains_nulls_only(field_id) or
self._contains_nans_only(field_id):
+ return ROWS_CANNOT_MATCH
+
+ if not isinstance(field.field_type, PrimitiveType):
+ raise ValueError(f"Expected PrimitiveType: {field.field_type}")
+
+ if lower_bound_bytes := self.lower_bounds.get(field_id): # type:
ignore
+ lower_bound = from_bytes(field.field_type, lower_bound_bytes) #
type: ignore
+ if self._is_nan(lower_bound):
+ # NaN indicates unreliable bounds. See the
InclusiveMetricsEvaluator docs for more.
+ return ROWS_MIGHT_MATCH
+
+ if lower_bound > literal.value:
+ return ROWS_CANNOT_MATCH
+
+ if upper_bound_bytes := self.upper_bounds.get(field_id): # type:
ignore
+ upper_bound = from_bytes(field.field_type, upper_bound_bytes) #
type: ignore
+ if self._is_nan(upper_bound):
+ # NaN indicates unreliable bounds. See the
InclusiveMetricsEvaluator docs for more.
+ return ROWS_MIGHT_MATCH
+
+ if upper_bound < literal.value:
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
+ return ROWS_MIGHT_MATCH
+
+ def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
+ field = term.ref().field
+ field_id = field.field_id
+
+ if self._contains_nulls_only(field_id) or
self._contains_nans_only(field_id):
+ return ROWS_CANNOT_MATCH
+
+ if len(literals) > IN_PREDICATE_LIMIT:
+ # skip evaluating the predicate if the number of values is too big
+ return ROWS_MIGHT_MATCH
+
+ if not isinstance(field.field_type, PrimitiveType):
+ raise ValueError(f"Expected PrimitiveType: {field.field_type}")
+
+ if lower_bound_bytes := self.lower_bounds.get(field_id): # type:
ignore
+ lower_bound = from_bytes(field.field_type, lower_bound_bytes) #
type: ignore
+ if self._is_nan(lower_bound):
+ # NaN indicates unreliable bounds. See the
InclusiveMetricsEvaluator docs for more.
+ return ROWS_MIGHT_MATCH
+
+ literals = {lit for lit in literals if lower_bound <= lit}
+ if len(literals) == 0:
+ return ROWS_CANNOT_MATCH
+
+ if upper_bound_bytes := self.upper_bounds.get(field_id): # type:
ignore
+ upper_bound = from_bytes(field.field_type, upper_bound_bytes) #
type: ignore
+ # this is different from Java, here NaN is always larger
+ if self._is_nan(upper_bound):
+ return ROWS_MIGHT_MATCH
+
+ literals = {lit for lit in literals if upper_bound >= lit}
+ if len(literals) == 0:
+ return ROWS_CANNOT_MATCH
+
+ return ROWS_MIGHT_MATCH
+
+ def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
+ # because the bounds are not necessarily a min or max value, this
cannot be answered using
+ # them. notIn(col, {X, ...}) with (X, Y) doesn't guarantee that X is a
value in col.
+ return ROWS_MIGHT_MATCH
diff --git a/python/pyiceberg/io/pyarrow.py b/python/pyiceberg/io/pyarrow.py
index a064ae3bdf..d2cd762273 100644
--- a/python/pyiceberg/io/pyarrow.py
+++ b/python/pyiceberg/io/pyarrow.py
@@ -555,8 +555,10 @@ def project_table(
if len(tables) > 1:
return pa.concat_tables(tables)
- else:
+ elif len(tables) == 1:
return tables[0]
+ else:
+ return pa.Table.from_batches([],
schema=schema_to_pyarrow(projected_schema))
def to_requested_schema(requested_schema: Schema, file_schema: Schema, table:
pa.Table) -> pa.Table:
diff --git a/python/pyiceberg/table/__init__.py
b/python/pyiceberg/table/__init__.py
index 7c53fd0967..69ce08f457 100644
--- a/python/pyiceberg/table/__init__.py
+++ b/python/pyiceberg/table/__init__.py
@@ -43,7 +43,7 @@ from pyiceberg.expressions import (
parser,
visitors,
)
-from pyiceberg.expressions.visitors import inclusive_projection
+from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator,
inclusive_projection
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.manifest import (
DataFile,
@@ -314,11 +314,16 @@ def _check_content(file: DataFile) -> DataFile:
return file
-def _open_manifest(io: FileIO, manifest: ManifestFile, partition_filter:
Callable[[DataFile], bool]) -> List[FileScanTask]:
+def _open_manifest(
+ io: FileIO,
+ manifest: ManifestFile,
+ partition_filter: Callable[[DataFile], bool],
+ metrics_evaluator: Callable[[DataFile], bool],
+) -> List[FileScanTask]:
all_files = files(io.new_input(manifest.manifest_path))
matching_partition_files = filter(partition_filter, all_files)
matching_partition_data_files = map(_check_content,
matching_partition_files)
- return [FileScanTask(file) for file in matching_partition_data_files]
+ return [FileScanTask(file) for file in matching_partition_data_files if
metrics_evaluator(file)]
class DataScan(TableScan):
@@ -376,12 +381,20 @@ class DataScan(TableScan):
# this filter depends on the partition spec used to write the manifest
file
partition_evaluators: Dict[int, Callable[[DataFile], bool]] =
KeyDefaultDict(self._build_partition_evaluator)
-
+ metrics_evaluator = _InclusiveMetricsEvaluator(self.table.schema(),
self.row_filter, self.case_sensitive).eval
with ThreadPool() as pool:
return chain(
*pool.starmap(
func=_open_manifest,
- iterable=[(io, manifest,
partition_evaluators[manifest.partition_spec_id]) for manifest in manifests],
+ iterable=[
+ (
+ io,
+ manifest,
+ partition_evaluators[manifest.partition_spec_id],
+ metrics_evaluator,
+ )
+ for manifest in manifests
+ ],
)
)
diff --git a/python/tests/expressions/test_evaluator.py
b/python/tests/expressions/test_evaluator.py
index b57ede62f2..7de9c04fb7 100644
--- a/python/tests/expressions/test_evaluator.py
+++ b/python/tests/expressions/test_evaluator.py
@@ -14,11 +14,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from typing import Optional
+# pylint:disable=redefined-outer-name
+from typing import Any
+import pytest
+
+from pyiceberg.conversions import to_bytes
from pyiceberg.expressions import (
- AlwaysFalse,
- AlwaysTrue,
And,
EqualTo,
GreaterThan,
@@ -35,145 +37,748 @@ from pyiceberg.expressions import (
NotNull,
Or,
)
-from pyiceberg.expressions.visitors import expression_evaluator
+from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator
+from pyiceberg.manifest import DataFile, FileFormat
from pyiceberg.schema import Schema
-from pyiceberg.typedef import Record
from pyiceberg.types import (
DoubleType,
- LongType,
+ FloatType,
+ IcebergType,
+ IntegerType,
NestedField,
+ PrimitiveType,
StringType,
)
-SIMPLE_SCHEMA = Schema(
- NestedField(id=1, name="id", field_type=LongType()), NestedField(id=2,
name="data", field_type=StringType(), required=False)
-)
+INT_MIN_VALUE = 30
+INT_MAX_VALUE = 79
+
+
+def _to_byte_buffer(field_type: IcebergType, val: Any) -> bytes:
+ if not isinstance(field_type, PrimitiveType):
+ raise ValueError(f"Expected a PrimitiveType, got: {type(field_type)}")
+ return to_bytes(field_type, val)
+
+
+INT_MIN = _to_byte_buffer(IntegerType(), INT_MIN_VALUE)
+INT_MAX = _to_byte_buffer(IntegerType(), INT_MAX_VALUE)
+
+STRING_MIN = _to_byte_buffer(StringType(), "a")
+STRING_MAX = _to_byte_buffer(StringType(), "z")
+
+
[email protected]
+def schema_data_file() -> Schema:
+ return Schema(
+ NestedField(1, "id", IntegerType(), required=True),
+ NestedField(2, "no_stats", IntegerType(), required=False),
+ NestedField(3, "required", StringType(), required=True),
+ NestedField(4, "all_nulls", StringType(), required=False),
+ NestedField(5, "some_nulls", StringType(), required=False),
+ NestedField(6, "no_nulls", StringType(), required=False),
+ NestedField(7, "all_nans", DoubleType(), required=False),
+ NestedField(8, "some_nans", FloatType(), required=False),
+ NestedField(9, "no_nans", FloatType(), required=False),
+ NestedField(10, "all_nulls_double", DoubleType(), required=False),
+ NestedField(11, "all_nans_v1_stats", FloatType(), required=False),
+ NestedField(12, "nan_and_null_only", DoubleType(), required=False),
+ NestedField(13, "no_nan_stats", DoubleType(), required=False),
+ NestedField(14, "some_empty", StringType(), required=False),
+ )
+
+
[email protected]
+def data_file() -> DataFile:
+ return DataFile(
+ file_path="file_1.parquet",
+ file_format=FileFormat.PARQUET,
+ partition={},
+ record_count=50,
+ file_size_in_bytes=3,
+ value_counts={
+ 1: 50,
+ 2: 50,
+ 3: 50,
+ 4: 50,
+ 5: 50,
+ 6: 50,
+ 7: 50,
+ 8: 50,
+ 9: 50,
+ 10: 50,
+ 11: 50,
+ 12: 50,
+ 13: 50,
+ 14: 50,
+ },
+ null_value_counts={4: 50, 5: 10, 6: 0, 10: 50, 11: 0, 12: 1, 14: 8},
+ nan_value_counts={
+ 7: 50,
+ 8: 10,
+ 9: 0,
+ },
+ lower_bounds={
+ 1: to_bytes(IntegerType(), INT_MIN_VALUE),
+ 11: to_bytes(FloatType(), float("nan")),
+ 12: to_bytes(DoubleType(), float("nan")),
+ 14: to_bytes(StringType(), ""),
+ },
+ upper_bounds={
+ 1: to_bytes(IntegerType(), INT_MAX_VALUE),
+ 11: to_bytes(FloatType(), float("nan")),
+ 12: to_bytes(DoubleType(), float("nan")),
+ 14: to_bytes(StringType(), "房东整租霍营小区二层两居室"),
+ },
+ )
-FLOAT_SCHEMA = Schema(
- NestedField(id=1, name="id", field_type=LongType()), NestedField(id=2,
name="f", field_type=DoubleType(), required=False)
-)
+def test_all_null(schema_data_file: Schema, data_file: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotNull("all_nulls")).eval(data_file)
+ assert not should_read, "Should skip: no non-null value in all null column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
LessThan("all_nulls", "a")).eval(data_file)
+ assert not should_read, "Should skip: lessThan on all null column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
LessThanOrEqual("all_nulls", "a")).eval(data_file)
+ assert not should_read, "Should skip: lessThanOrEqual on all null column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
GreaterThan("all_nulls", "a")).eval(data_file)
+ assert not should_read, "Should skip: greaterThan on all null column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
GreaterThanOrEqual("all_nulls", "a")).eval(data_file)
+ assert not should_read, "Should skip: greaterThanOrEqual on all null
column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
EqualTo("all_nulls", "a")).eval(data_file)
+ assert not should_read, "Should skip: equal on all null column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotNull("some_nulls")).eval(data_file)
+ assert should_read, "Should read: column with some nulls contains a
non-null value"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotNull("no_nulls")).eval(data_file)
+ assert should_read, "Should read: non-null column contains a non-null
value"
+
+
+def test_no_nulls(schema_data_file: Schema, data_file: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
IsNull("all_nulls")).eval(data_file)
+ assert should_read, "Should read: at least one null value in all null
column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
IsNull("some_nulls")).eval(data_file)
+ assert should_read, "Should read: column with some nulls contains a null
value"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
IsNull("no_nulls")).eval(data_file)
+ assert not should_read, "Should skip: non-null column contains no null
values"
+
+
+def test_is_nan(schema_data_file: Schema, data_file: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
IsNaN("all_nans")).eval(data_file)
+ assert should_read, "Should read: at least one nan value in all nan column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
IsNaN("some_nans")).eval(data_file)
+ assert should_read, "Should read: at least one nan value in some nan
column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
IsNaN("no_nans")).eval(data_file)
+ assert not should_read, "Should skip: no-nans column contains no nan
values"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
IsNaN("all_nulls_double")).eval(data_file)
+ assert not should_read, "Should skip: all-null column doesn't contain nan
value"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
IsNaN("no_nan_stats")).eval(data_file)
+ assert should_read, "Should read: no guarantee on if contains nan value
without nan stats"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
IsNaN("all_nans_v1_stats")).eval(data_file)
+ assert should_read, "Should read: at least one nan value in all nan column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
IsNaN("nan_and_null_only")).eval(data_file)
+ assert should_read, "Should read: at least one nan value in nan and nulls
only column"
+
+
+def test_not_nan(schema_data_file: Schema, data_file: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotNaN("all_nans")).eval(data_file)
+ assert not should_read, "Should skip: column with all nans will not
contain non-nan"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotNaN("some_nans")).eval(data_file)
+ assert should_read, "Should read: at least one non-nan value in some nan
column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotNaN("no_nans")).eval(data_file)
+ assert should_read, "Should read: at least one non-nan value in no nan
column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotNaN("all_nulls_double")).eval(data_file)
+ assert should_read, "Should read: at least one non-nan value in all null
column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotNaN("no_nan_stats")).eval(data_file)
+ assert should_read, "Should read: no guarantee on if contains nan value
without nan stats"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotNaN("all_nans_v1_stats")).eval(data_file)
+ assert should_read, "Should read: no guarantee on if contains nan value
without nan stats"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotNaN("nan_and_null_only")).eval(data_file)
+ assert should_read, "Should read: at least one null value in nan and nulls
only column"
+
+
+def test_required_column(schema_data_file: Schema, data_file: DataFile) ->
None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotNull("required")).eval(data_file)
+ assert should_read, "Should read: required columns are always non-null"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
IsNull("required")).eval(data_file)
+ assert not should_read, "Should skip: required columns are always non-null"
+
+
+def test_missing_column(schema_data_file: Schema, data_file: DataFile) -> None:
+ with pytest.raises(ValueError) as exc_info:
+ _ = _InclusiveMetricsEvaluator(schema_data_file, LessThan("missing",
22)).eval(data_file)
+
+ assert str(exc_info.value) == "Could not find field with name missing,
case_sensitive=True"
+
+
+def test_missing_stats() -> None:
+ no_stats_schema = Schema(
+ NestedField(2, "no_stats", DoubleType(), required=False),
+ )
+
+ no_stats_file = DataFile(
+ file_path="file_1.parquet",
+ file_format=FileFormat.PARQUET,
+ partition={},
+ record_count=50,
+ value_counts=None,
+ null_value_counts=None,
+ nan_value_counts=None,
+ lower_bounds=None,
+ upper_bounds=None,
+ )
+
+ expressions = [
+ LessThan("no_stats", 5),
+ LessThanOrEqual("no_stats", 30),
+ EqualTo("no_stats", 70),
+ GreaterThan("no_stats", 78),
+ GreaterThanOrEqual("no_stats", 90),
+ NotEqualTo("no_stats", 101),
+ IsNull("no_stats"),
+ NotNull("no_stats"),
+ IsNaN("no_stats"),
+ NotNaN("no_stats"),
+ ]
+
+ for expression in expressions:
+ should_read = _InclusiveMetricsEvaluator(no_stats_schema,
expression).eval(no_stats_file)
+ assert should_read, f"Should read when stats are missing for:
{expression}"
+
+
+def test_zero_record_file_stats(schema_data_file: Schema) -> None:
+ zero_record_data_file = DataFile(file_path="file_1.parquet",
file_format=FileFormat.PARQUET, partition={}, record_count=0)
+
+ expressions = [
+ LessThan("no_stats", 5),
+ LessThanOrEqual("no_stats", 30),
+ EqualTo("no_stats", 70),
+ GreaterThan("no_stats", 78),
+ GreaterThanOrEqual("no_stats", 90),
+ NotEqualTo("no_stats", 101),
+ IsNull("no_stats"),
+ NotNull("no_stats"),
+ IsNaN("no_stats"),
+ NotNaN("no_stats"),
+ ]
+
+ for expression in expressions:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
expression).eval(zero_record_data_file)
+ assert not should_read, f"Should skip a datafile without records:
{expression}"
+
+
+def test_not(schema_data_file: Schema, data_file: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
Not(LessThan("id", INT_MIN_VALUE - 25))).eval(data_file)
+ assert should_read, "Should read: not(false)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
Not(GreaterThan("id", INT_MIN_VALUE - 25))).eval(data_file)
+ assert not should_read, "Should skip: not(true)"
+
+
+def test_and(schema_data_file: Schema, data_file: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(
+ schema_data_file, And(LessThan("id", INT_MIN_VALUE - 25),
GreaterThanOrEqual("id", INT_MIN_VALUE - 30))
+ ).eval(data_file)
+ assert not should_read, "Should skip: and(false, true)"
+
+ should_read = _InclusiveMetricsEvaluator(
+ schema_data_file, And(LessThan("id", INT_MIN_VALUE - 25),
GreaterThanOrEqual("id", INT_MIN_VALUE + 1))
+ ).eval(data_file)
+ assert not should_read, "Should skip: and(false, false)"
+
+ should_read = _InclusiveMetricsEvaluator(
+ schema_data_file, And(GreaterThan("id", INT_MIN_VALUE - 25),
LessThanOrEqual("id", INT_MIN_VALUE))
+ ).eval(data_file)
+ assert should_read, "Should read: and(true, true)"
+
+
+def test_or(schema_data_file: Schema, data_file: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(
+ schema_data_file, Or(LessThan("id", INT_MIN_VALUE - 25),
GreaterThanOrEqual("id", INT_MAX_VALUE + 1))
+ ).eval(data_file)
+ assert not should_read, "Should skip: or(false, false)"
+
+ should_read = _InclusiveMetricsEvaluator(
+ schema_data_file, Or(LessThan("id", INT_MIN_VALUE - 25),
GreaterThanOrEqual("id", INT_MAX_VALUE - 19))
+ ).eval(data_file)
+ assert should_read, "Should read: or(false, true)"
+
+
+def test_integer_lt(schema_data_file: Schema, data_file: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, LessThan("id",
INT_MIN_VALUE - 25)).eval(data_file)
+ assert not should_read, "Should not read: id range below lower bound (5 <
30)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, LessThan("id",
INT_MIN_VALUE)).eval(data_file)
+ assert not should_read, "Should not read: id range below lower bound (30
is not < 30)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, LessThan("id",
INT_MIN_VALUE + 1)).eval(data_file)
+ assert should_read, "Should read: one possible id"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, LessThan("id",
INT_MAX_VALUE)).eval(data_file)
+ assert should_read, "Should read: may possible ids"
+
+
+def test_integer_lt_eq(schema_data_file: Schema, data_file: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
LessThanOrEqual("id", INT_MIN_VALUE - 25)).eval(data_file)
+ assert not should_read, "Should not read: id range below lower bound (5 <
30)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
LessThanOrEqual("id", INT_MIN_VALUE - 1)).eval(data_file)
+ assert not should_read, "Should not read: id range below lower bound (30
is not < 30)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
LessThanOrEqual("id", INT_MIN_VALUE)).eval(data_file)
+ assert should_read, "Should read: one possible id"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
LessThanOrEqual("id", INT_MAX_VALUE)).eval(data_file)
+ assert should_read, "Should read: may possible ids"
+
+
+def test_integer_gt(schema_data_file: Schema, data_file: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
GreaterThan("id", INT_MAX_VALUE + 6)).eval(data_file)
+ assert not should_read, "Should not read: id range above upper bound (85 <
79)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
GreaterThan("id", INT_MAX_VALUE)).eval(data_file)
+ assert not should_read, "Should not read: id range above upper bound (79
is not > 79)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
GreaterThan("id", INT_MIN_VALUE - 1)).eval(data_file)
+ assert should_read, "Should read: one possible id"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
GreaterThan("id", INT_MAX_VALUE - 4)).eval(data_file)
+ assert should_read, "Should read: may possible ids"
+
+
+def test_integer_gt_eq(schema_data_file: Schema, data_file: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
GreaterThanOrEqual("id", INT_MAX_VALUE + 6)).eval(data_file)
+ assert not should_read, "Should not read: id range above upper bound (85 <
79)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
GreaterThanOrEqual("id", INT_MAX_VALUE + 1)).eval(data_file)
+ assert not should_read, "Should not read: id range above upper bound (80 >
79)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
GreaterThanOrEqual("id", INT_MAX_VALUE)).eval(data_file)
+ assert should_read, "Should read: one possible id"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
GreaterThanOrEqual("id", INT_MAX_VALUE - 4)).eval(data_file)
+ assert should_read, "Should read: may possible ids"
+
+
+def test_integer_eq(schema_data_file: Schema, data_file: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("id",
INT_MIN_VALUE - 25)).eval(data_file)
+ assert not should_read, "Should not read: id below lower bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("id",
INT_MIN_VALUE - 1)).eval(data_file)
+ assert not should_read, "Should not read: id below lower bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("id",
INT_MIN_VALUE)).eval(data_file)
+ assert should_read, "Should read: id equal to lower bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("id",
INT_MAX_VALUE - 4)).eval(data_file)
+ assert should_read, "Should read: id between lower and upper bounds"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("id",
INT_MAX_VALUE)).eval(data_file)
+ assert should_read, "Should read: id equal to upper bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("id",
INT_MAX_VALUE + 1)).eval(data_file)
+ assert not should_read, "Should not read: id above upper bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, EqualTo("id",
INT_MAX_VALUE + 6)).eval(data_file)
+ assert not should_read, "Should not read: id above upper bound"
+
+
+def test_integer_not_eq(schema_data_file: Schema, data_file: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotEqualTo("id", INT_MIN_VALUE - 25)).eval(data_file)
+ assert should_read, "Should read: id below lower bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotEqualTo("id", INT_MIN_VALUE - 1)).eval(data_file)
+ assert should_read, "Should read: id below lower bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotEqualTo("id", INT_MIN_VALUE)).eval(data_file)
+ assert should_read, "Should read: id equal to lower bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotEqualTo("id", INT_MAX_VALUE - 4)).eval(data_file)
+ assert should_read, "Should read: id between lower and upper bounds"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotEqualTo("id", INT_MAX_VALUE)).eval(data_file)
+ assert should_read, "Should read: id equal to upper bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotEqualTo("id", INT_MAX_VALUE + 1)).eval(data_file)
+ assert should_read, "Should read: id above upper bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotEqualTo("id", INT_MAX_VALUE + 6)).eval(data_file)
+ assert should_read, "Should read: id above upper bound"
+
+
+def test_integer_not_eq_rewritten(schema_data_file: Schema, data_file:
DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
Not(EqualTo("id", INT_MIN_VALUE - 25))).eval(data_file)
+ assert should_read, "Should read: id below lower bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
Not(EqualTo("id", INT_MIN_VALUE - 1))).eval(data_file)
+ assert should_read, "Should read: id below lower bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
Not(EqualTo("id", INT_MIN_VALUE))).eval(data_file)
+ assert should_read, "Should read: id equal to lower bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
Not(EqualTo("id", INT_MAX_VALUE - 4))).eval(data_file)
+ assert should_read, "Should read: id between lower and upper bounds"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
Not(EqualTo("id", INT_MAX_VALUE))).eval(data_file)
+ assert should_read, "Should read: id equal to upper bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
Not(EqualTo("id", INT_MAX_VALUE + 1))).eval(data_file)
+ assert should_read, "Should read: id above upper bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
Not(EqualTo("id", INT_MAX_VALUE + 6))).eval(data_file)
+ assert should_read, "Should read: id above upper bound"
+
+
+def test_integer_case_insensitive_not_eq_rewritten(schema_data_file: Schema,
data_file: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
Not(EqualTo("ID", INT_MIN_VALUE - 25)), case_sensitive=False).eval(
+ data_file
+ )
+ assert should_read, "Should read: id below lower bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
Not(EqualTo("ID", INT_MIN_VALUE - 1)), case_sensitive=False).eval(
+ data_file
+ )
+ assert should_read, "Should read: id below lower bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
Not(EqualTo("ID", INT_MIN_VALUE)), case_sensitive=False).eval(
+ data_file
+ )
+ assert should_read, "Should read: id equal to lower bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
Not(EqualTo("ID", INT_MAX_VALUE - 4)), case_sensitive=False).eval(
+ data_file
+ )
+ assert should_read, "Should read: id between lower and upper bounds"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
Not(EqualTo("ID", INT_MAX_VALUE)), case_sensitive=False).eval(
+ data_file
+ )
+ assert should_read, "Should read: id equal to upper bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
Not(EqualTo("ID", INT_MAX_VALUE + 1)), case_sensitive=False).eval(
+ data_file
+ )
+ assert should_read, "Should read: id above upper bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
Not(EqualTo("ID", INT_MAX_VALUE + 6)), case_sensitive=False).eval(
+ data_file
+ )
+ assert should_read, "Should read: id above upper bound"
+
+
+def test_missing_column_case_sensitive(schema_data_file: Schema, data_file:
DataFile) -> None:
+ with pytest.raises(ValueError) as exc_info:
+ _ = _InclusiveMetricsEvaluator(schema_data_file, LessThan("ID", 22),
case_sensitive=True).eval(data_file)
+
+ assert str(exc_info.value) == "Could not find field with name ID,
case_sensitive=True"
+
+
+def test_integer_in(schema_data_file: Schema, data_file: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, In("id",
{INT_MIN_VALUE - 25, INT_MIN_VALUE - 24})).eval(data_file)
+ assert not should_read, "Should not read: id below lower bound (5 < 30, 6
< 30)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, In("id",
{INT_MIN_VALUE - 2, INT_MIN_VALUE - 1})).eval(data_file)
+ assert not should_read, "Should not read: id below lower bound (28 < 30,
29 < 30)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, In("id",
{INT_MIN_VALUE - 1, INT_MIN_VALUE})).eval(data_file)
+ assert should_read, "Should read: id equal to lower bound (30 == 30)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, In("id",
{INT_MAX_VALUE - 4, INT_MAX_VALUE - 3})).eval(data_file)
+ assert should_read, "Should read: id between lower and upper bounds (30 <
75 < 79, 30 < 76 < 79)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, In("id",
{INT_MAX_VALUE, INT_MAX_VALUE + 1})).eval(data_file)
+ assert should_read, "Should read: id equal to upper bound (79 == 79)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, In("id",
{INT_MAX_VALUE + 1, INT_MAX_VALUE + 2})).eval(data_file)
+ assert not should_read, "Should not read: id above upper bound (80 > 79,
81 > 79)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, In("id",
{INT_MAX_VALUE + 6, INT_MAX_VALUE + 7})).eval(data_file)
+ assert not should_read, "Should not read: id above upper bound (85 > 79,
86 > 79)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, In("all_nulls",
{"abc", "def"})).eval(data_file)
+ assert not should_read, "Should skip: in on all nulls column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
In("some_nulls", {"abc", "def"})).eval(data_file)
+ assert should_read, "Should read: in on some nulls column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, In("no_nulls",
{"abc", "def"})).eval(data_file)
+ assert should_read, "Should read: in on no nulls column"
+
+ ids = list(range(400))
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, In("id",
ids)).eval(data_file)
+ assert should_read, "Should read: large in expression"
+
+
+def test_integer_not_in(schema_data_file: Schema, data_file: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, NotIn("id",
{INT_MIN_VALUE - 25, INT_MIN_VALUE - 24})).eval(
+ data_file
+ )
+ assert should_read, "Should read: id below lower bound (5 < 30, 6 < 30)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, NotIn("id",
{INT_MIN_VALUE - 2, INT_MIN_VALUE - 1})).eval(
+ data_file
+ )
+ assert should_read, "Should read: id below lower bound (28 < 30, 29 < 30)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, NotIn("id",
{INT_MIN_VALUE - 1, INT_MIN_VALUE})).eval(data_file)
+ assert should_read, "Should read: id equal to lower bound (30 == 30)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, NotIn("id",
{INT_MAX_VALUE - 4, INT_MAX_VALUE - 3})).eval(
+ data_file
+ )
+ assert should_read, "Should read: id between lower and upper bounds (30 <
75 < 79, 30 < 76 < 79)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, NotIn("id",
{INT_MAX_VALUE, INT_MAX_VALUE + 1})).eval(data_file)
+ assert should_read, "Should read: id equal to upper bound (79 == 79)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, NotIn("id",
{INT_MAX_VALUE + 1, INT_MAX_VALUE + 2})).eval(
+ data_file
+ )
+ assert should_read, "Should read: id above upper bound (80 > 79, 81 > 79)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file, NotIn("id",
{INT_MAX_VALUE + 6, INT_MAX_VALUE + 7})).eval(
+ data_file
+ )
+ assert should_read, "Should read: id above upper bound (85 > 79, 86 > 79)"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotIn("all_nulls", {"abc", "def"})).eval(data_file)
+ assert should_read, "Should read: notIn on all nulls column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotIn("some_nulls", {"abc", "def"})).eval(data_file)
+ assert should_read, "Should read: in on some nulls column"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file,
NotIn("no_nulls", {"abc", "def"})).eval(data_file)
+ assert should_read, "Should read: in on no nulls column"
+
+
[email protected]
+def schema_data_file_nan() -> Schema:
+ return Schema(
+ NestedField(1, "all_nan", DoubleType(), required=True),
+ NestedField(2, "max_nan", DoubleType(), required=True),
+ NestedField(3, "min_max_nan", FloatType(), required=False),
+ NestedField(4, "all_nan_null_bounds", DoubleType(), required=True),
+ NestedField(5, "some_nan_correct_bounds", FloatType(), required=False),
+ )
+
+
[email protected]
+def data_file_nan() -> DataFile:
+ return DataFile(
+ file_path="file.avro",
+ file_format=FileFormat.PARQUET,
+ partition={},
+ record_count=50,
+ file_size_in_bytes=3,
+ column_sizes={
+ 1: 10,
+ 2: 10,
+ 3: 10,
+ 4: 10,
+ 5: 10,
+ },
+ value_counts={
+ 1: 10,
+ 2: 10,
+ 3: 10,
+ 4: 10,
+ 5: 10,
+ },
+ null_value_counts={
+ 1: 0,
+ 2: 0,
+ 3: 0,
+ 4: 0,
+ 5: 0,
+ },
+ nan_value_counts={1: 10, 4: 10, 5: 5},
+ lower_bounds={
+ 1: to_bytes(DoubleType(), float("nan")),
+ 2: to_bytes(DoubleType(), 7),
+ 3: to_bytes(FloatType(), float("nan")),
+ 5: to_bytes(FloatType(), 7),
+ },
+ upper_bounds={
+ 1: to_bytes(DoubleType(), float("nan")),
+ 2: to_bytes(DoubleType(), float("nan")),
+ 3: to_bytes(FloatType(), float("nan")),
+ 5: to_bytes(FloatType(), 22),
+ },
+ )
+
+
+def
test_inclusive_metrics_evaluator_less_than_and_less_than_equal(schema_data_file_nan:
Schema, data_file_nan: DataFile) -> None:
+ for operator in [LessThan, LessThanOrEqual]:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
operator("all_nan", 1)).eval(data_file_nan)
+ assert not should_read, "Should not match: all nan column doesn't
contain number"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
operator("max_nan", 1)).eval(data_file_nan)
+ assert not should_read, "Should not match: 1 is smaller than lower
bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
operator("max_nan", 10)).eval(data_file_nan)
+ assert should_read, "Should match: 10 is larger than lower bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
operator("min_max_nan", 1)).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
operator("all_nan_null_bounds", 1)).eval(data_file_nan)
+ assert not should_read, "Should not match: all nan column doesn't
contain number"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
operator("some_nan_correct_bounds", 1)).eval(data_file_nan)
+ assert not should_read, "Should not match: 1 is smaller than lower
bound"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
operator("some_nan_correct_bounds", 10)).eval(
+ data_file_nan
+ )
+ assert should_read, "Should match: 10 larger than lower bound"
+
+
+def test_inclusive_metrics_evaluator_greater_than_and_greater_than_equal(
+ schema_data_file_nan: Schema, data_file_nan: DataFile
+) -> None:
+ for operator in [GreaterThan, GreaterThanOrEqual]:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
operator("all_nan", 1)).eval(data_file_nan)
+ assert not should_read, "Should not match: all nan column doesn't
contain number"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
operator("max_nan", 1)).eval(data_file_nan)
+ assert should_read, "Should match: upper bound is larger than 1"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
operator("max_nan", 10)).eval(data_file_nan)
+ assert should_read, "Should match: upper bound is larger than 10"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
operator("min_max_nan", 1)).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
operator("all_nan_null_bounds", 1)).eval(data_file_nan)
+ assert not should_read, "Should not match: all nan column doesn't
contain number"
+
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
operator("some_nan_correct_bounds", 1)).eval(data_file_nan)
+ assert should_read, "Should match: 1 is smaller than upper bound"
-def _record_simple(id: int, data: Optional[str]) -> Record: # pylint:
disable=redefined-builtin
- r = Record(struct=SIMPLE_SCHEMA.as_struct())
- r[0] = id
- r[1] = data
- return r
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
operator("some_nan_correct_bounds", 10)).eval(
+ data_file_nan
+ )
+ assert should_read, "Should match: 10 is smaller than upper bound"
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
operator("all_nan", 30)).eval(data_file_nan)
+ assert not should_read, "Should not match: 30 is greater than upper
bound"
-def _record_float(id: float, f: float) -> Record: # pylint:
disable=redefined-builtin
- r = Record(struct=FLOAT_SCHEMA.as_struct())
- r[0] = id
- r[1] = f
- return r
+def test_inclusive_metrics_evaluator_equals(schema_data_file_nan: Schema,
data_file_nan: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
EqualTo("all_nan", 1)).eval(data_file_nan)
+ assert not should_read, "Should not match: all nan column doesn't contain
number"
-def test_true() -> None:
- evaluate = expression_evaluator(SIMPLE_SCHEMA, AlwaysTrue(),
case_sensitive=True)
- assert evaluate(Record(1, "a"))
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
EqualTo("max_nan", 1)).eval(data_file_nan)
+ assert not should_read, "Should not match: 1 is smaller than lower bound"
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
EqualTo("max_nan", 10)).eval(data_file_nan)
+ assert should_read, "Should match: 10 is within bounds"
-def test_false() -> None:
- evaluate = expression_evaluator(SIMPLE_SCHEMA, AlwaysFalse(),
case_sensitive=True)
- assert not evaluate(Record(1, "a"))
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
EqualTo("min_max_nan", 1)).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
EqualTo("all_nan_null_bounds", 1)).eval(data_file_nan)
+ assert not should_read, "Should not match: all nan column doesn't contain
number"
-def test_less_than() -> None:
- evaluate = expression_evaluator(SIMPLE_SCHEMA, LessThan("id", 3),
case_sensitive=True)
- assert evaluate(Record(2, "a"))
- assert not evaluate(Record(3, "a"))
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
EqualTo("some_nan_correct_bounds", 1)).eval(data_file_nan)
+ assert not should_read, "Should not match: 1 is smaller than lower bound"
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
EqualTo("some_nan_correct_bounds", 10)).eval(data_file_nan)
+ assert should_read, "Should match: 10 is within bounds"
-def test_less_than_or_equal() -> None:
- evaluate = expression_evaluator(SIMPLE_SCHEMA, LessThanOrEqual("id", 3),
case_sensitive=True)
- assert evaluate(Record(1, "a"))
- assert evaluate(Record(3, "a"))
- assert not evaluate(Record(4, "a"))
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
EqualTo("all_nan", 30)).eval(data_file_nan)
+ assert not should_read, "Should not match: 30 is greater than upper bound"
-def test_greater_than() -> None:
- evaluate = expression_evaluator(SIMPLE_SCHEMA, GreaterThan("id", 3),
case_sensitive=True)
- assert not evaluate(Record(1, "a"))
- assert not evaluate(Record(3, "a"))
- assert evaluate(Record(4, "a"))
+def test_inclusive_metrics_evaluator_not_equals(schema_data_file_nan: Schema,
data_file_nan: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
NotEqualTo("all_nan", 1)).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
NotEqualTo("max_nan", 10)).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
-def test_greater_than_or_equal() -> None:
- evaluate = expression_evaluator(SIMPLE_SCHEMA, GreaterThanOrEqual("id",
3), case_sensitive=True)
- assert not evaluate(Record(2, "a"))
- assert evaluate(Record(3, "a"))
- assert evaluate(Record(4, "a"))
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
NotEqualTo("max_nan", 10)).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
NotEqualTo("min_max_nan", 1)).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
-def test_equal_to() -> None:
- evaluate = expression_evaluator(SIMPLE_SCHEMA, EqualTo("id", 3),
case_sensitive=True)
- assert not evaluate(Record(2, "a"))
- assert evaluate(Record(3, "a"))
- assert not evaluate(Record(4, "a"))
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
NotEqualTo("all_nan_null_bounds", 1)).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
NotEqualTo("some_nan_correct_bounds", 1)).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
-def test_not_equal_to() -> None:
- evaluate = expression_evaluator(SIMPLE_SCHEMA, NotEqualTo("id", 3),
case_sensitive=True)
- assert evaluate(Record(2, "a"))
- assert not evaluate(Record(3, "a"))
- assert evaluate(Record(4, "a"))
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
NotEqualTo("some_nan_correct_bounds", 10)).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
NotEqualTo("some_nan_correct_bounds", 30)).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
-def test_in() -> None:
- evaluate = expression_evaluator(SIMPLE_SCHEMA, In("id", [1, 2, 3]),
case_sensitive=True)
- assert evaluate(Record(2, "a"))
- assert evaluate(Record(3, "a"))
- assert not evaluate(Record(4, "a"))
+def test_inclusive_metrics_evaluator_in(schema_data_file_nan: Schema,
data_file_nan: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
In("all_nan", (1, 10, 30))).eval(data_file_nan)
+ assert not should_read, "Should not match: all nan column doesn't contain
number"
-def test_not_in() -> None:
- evaluate = expression_evaluator(SIMPLE_SCHEMA, NotIn("id", [1, 2, 3]),
case_sensitive=True)
- assert not evaluate(Record(2, "a"))
- assert not evaluate(Record(3, "a"))
- assert evaluate(Record(4, "a"))
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
In("max_nan", (1, 10, 30))).eval(data_file_nan)
+ assert should_read, "Should match: 10 and 30 are greater than lower bound"
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
In("min_max_nan", (1, 10, 30))).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
-def test_is_null() -> None:
- evaluate = expression_evaluator(SIMPLE_SCHEMA, IsNull("data"),
case_sensitive=True)
- assert not evaluate(Record(2, "a"))
- assert evaluate(Record(3, None))
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
In("all_nan_null_bounds", (1, 10, 30))).eval(data_file_nan)
+ assert not should_read, "Should not match: all nan column doesn't contain
number"
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
In("some_nan_correct_bounds", (1, 10, 30))).eval(data_file_nan)
+ assert should_read, "Should match: 10 within bounds"
-def test_not_null() -> None:
- evaluate = expression_evaluator(SIMPLE_SCHEMA, NotNull("data"),
case_sensitive=True)
- assert evaluate(Record(2, "a"))
- assert not evaluate(Record(3, None))
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
In("some_nan_correct_bounds", (1, 30))).eval(data_file_nan)
+ assert not should_read, "Should not match: 1 and 30 not within bounds"
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
In("some_nan_correct_bounds", (5, 7))).eval(data_file_nan)
+ assert should_read, "Should match: overlap with lower bound"
-def test_is_nan() -> None:
- evaluate = expression_evaluator(FLOAT_SCHEMA, IsNaN("f"),
case_sensitive=True)
- assert not evaluate(_record_float(2, f=0.0))
- assert not evaluate(_record_float(3, f=float("infinity")))
- assert evaluate(_record_float(4, f=float("nan")))
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
In("some_nan_correct_bounds", (22, 25))).eval(data_file_nan)
+ assert should_read, "Should match: overlap with upper bounds"
-def test_not_nan() -> None:
- evaluate = expression_evaluator(FLOAT_SCHEMA, NotNaN("f"),
case_sensitive=True)
- assert evaluate(_record_float(2, f=0.0))
- assert evaluate(_record_float(3, f=float("infinity")))
- assert not evaluate(_record_float(4, f=float("nan")))
+def test_inclusive_metrics_evaluator_not_in(schema_data_file_nan: Schema,
data_file_nan: DataFile) -> None:
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
NotIn("all_nan", (1, 10, 30))).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
NotIn("max_nan", (1, 10, 30))).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
-def test_not() -> None:
- evaluate = expression_evaluator(SIMPLE_SCHEMA, Not(LessThan("id", 3)),
case_sensitive=True)
- assert not evaluate(Record(2, "a"))
- assert evaluate(Record(3, "a"))
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
NotIn("max_nan", (1, 10, 30))).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
NotIn("min_max_nan", (1, 10, 30))).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
-def test_and() -> None:
- evaluate = expression_evaluator(SIMPLE_SCHEMA, And(LessThan("id", 3),
GreaterThan("id", 1)), case_sensitive=True)
- assert not evaluate(Record(1, "a"))
- assert evaluate(Record(2, "a"))
- assert not evaluate(Record(3, "a"))
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
NotIn("all_nan_null_bounds", (1, 10, 30))).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
NotIn("some_nan_correct_bounds", (1, 30))).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"
-def test_or() -> None:
- evaluate = expression_evaluator(SIMPLE_SCHEMA, Or(LessThan("id", 2),
GreaterThan("id", 2)), case_sensitive=True)
- assert evaluate(Record(1, "a"))
- assert not evaluate(Record(2, "a"))
- assert evaluate(Record(3, "a"))
+ should_read = _InclusiveMetricsEvaluator(schema_data_file_nan,
NotIn("some_nan_correct_bounds", (1, 30))).eval(data_file_nan)
+ assert should_read, "Should match: no visibility"