rdblue commented on code in PR #5845:
URL: https://github.com/apache/iceberg/pull/5845#discussion_r993706996


##########
python/pyiceberg/expressions/base.py:
##########
@@ -867,3 +877,205 @@ def visit_unbound_predicate(self, predicate) -> 
BooleanExpression:
 
     def visit_bound_predicate(self, predicate) -> BooleanExpression:
         return predicate
+
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+IN_PREDICATE_LIMIT = 200
+
+
+def _from_byte_buffer(field_type: IcebergType, val: bytes):
+    if not isinstance(field_type, PrimitiveType):
+        raise ValueError(f"Expected a PrimitiveType, got: {type(field_type)}")
+    return from_bytes(field_type, val)
+
+
+class _ManifestEvalVisitor(BoundBooleanExpressionVisitor[bool]):
+    partition_fields: list[PartitionFieldSummary]
+    partition_filter: BooleanExpression
+
+    def __init__(self, partition_struct_schema: Schema, partition_filter: 
UnboundPredicate, case_sensitive: bool = True):
+        bound_partition_filter = 
partition_filter.bind(partition_struct_schema, case_sensitive)
+        self.partition_filter = rewrite_not(bound_partition_filter)
+
+    def eval(self, manifest: ManifestFile) -> bool:
+        if partitions := manifest.partitions:
+            self.partition_fields = partitions
+            return visit(self.partition_filter, self)
+
+        # No partition information
+        return ROWS_MIGHT_MATCH
+
+    def visit_in(self, term: BoundTerm, literals: set[Literal[Any]]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        if len(literals) > IN_PREDICATE_LIMIT:
+            return ROWS_MIGHT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, 
field.lower_bound)
+
+        if all(lower > val.value for val in literals):
+            return ROWS_CANNOT_MATCH
+
+        if field.upper_bound is not None:
+            upper = _from_byte_buffer(term.ref().field.field_type, 
field.upper_bound)
+            if all(upper < val.value for val in literals):
+                return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_in(self, term: BoundTerm, literals: set[Literal[Any]]) -> 
bool:
+        # because the bounds are not necessarily a min or max value, this 
cannot be answered using
+        # them. notIn(col, {X, ...}) with (X, Y) doesn't guarantee that X is a 
value in col.
+        return ROWS_MIGHT_MATCH
+
+    def visit_is_nan(self, term: BoundTerm) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.contains_nan is False:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_nan(self, term: BoundTerm) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.contains_nan is True and field.contains_null is False and 
field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_is_null(self, term: BoundTerm) -> bool:
+        pos = term.ref().accessor.position
+
+        if self.partition_fields[pos].contains_null is False:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_null(self, term: BoundTerm) -> bool:
+        pos = term.ref().accessor.position
+
+        # contains_null encodes whether at least one partition value is null,
+        # lowerBound is null if all partition values are null
+        all_null = self.partition_fields[pos].contains_null is True and 
self.partition_fields[pos].lower_bound is None
+
+        if all_null and type(term.ref().field.field_type) in {DoubleType, 
FloatType}:
+            # floating point types may include NaN values, which we check 
separately.
+            # In case bounds don't include NaN value, contains_nan needs to be 
checked against.
+            all_null = self.partition_fields[pos].contains_nan is False
+
+        if all_null:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_equal(self, term: BoundTerm, literal: Literal[Any]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            # values are all null and literal cannot contain null
+            return ROWS_CANNOT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, 
field.lower_bound)
+
+        if lower > literal.value:
+            return ROWS_CANNOT_MATCH
+
+        upper = _from_byte_buffer(term.ref().field.field_type, 
field.lower_bound)
+
+        if literal.value > upper:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_equal(self, term: BoundTerm, literal: Literal[Any]) -> bool:
+        # because the bounds are not necessarily a min or max value, this 
cannot be answered using
+        # them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value 
in col.
+        return ROWS_MIGHT_MATCH
+
+    def visit_greater_than_or_equal(self, term: BoundTerm, literal: 
Literal[Any]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.upper_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        upper = _from_byte_buffer(term.ref().field.field_type, 
field.upper_bound)
+
+        if literal.value > upper:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_greater_than(self, term: BoundTerm, literal: Literal[Any]) -> 
bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.upper_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        upper = _from_byte_buffer(term.ref().field.field_type, 
field.upper_bound)
+
+        if literal.value >= upper:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_less_than(self, term: BoundTerm, literal: Literal[Any]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, 
field.lower_bound)
+
+        if literal.value <= lower:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_less_than_or_equal(self, term: BoundTerm, literal: Literal[Any]) 
-> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, 
field.lower_bound)
+
+        if literal.value < lower:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_true(self) -> bool:
+        return ROWS_MIGHT_MATCH
+
+    def visit_false(self) -> bool:
+        return ROWS_CANNOT_MATCH
+
+    def visit_not(self, child_result: bool) -> bool:
+        return not child_result
+
+    def visit_and(self, left_result: bool, right_result: bool) -> bool:
+        return left_result and right_result
+
+    def visit_or(self, left_result: bool, right_result: bool) -> bool:
+        return left_result or right_result
+
+
+def manifest_evaluator(
+    partition_spec: PartitionSpec, schema: Schema, partition_filter: 
UnboundPredicate, case_sensitive: bool = True
+) -> Callable[[ManifestFile], bool]:
+    partition_schema = Schema(*partition_spec.partition_type(schema))

Review Comment:
   I don't think it should be necessary to create a `Schema` from the 
`StructType`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to