Fokko commented on code in PR #5845:
URL: https://github.com/apache/iceberg/pull/5845#discussion_r985498605


##########
python/pyiceberg/expressions/base.py:
##########
@@ -835,3 +843,204 @@ def _(expr: BoundLessThan, visitor: 
BoundBooleanExpressionVisitor[T]) -> T:
 @visit_bound_predicate.register(BoundLessThanOrEqual)
 def _(expr: BoundLessThanOrEqual, visitor: BoundBooleanExpressionVisitor[T]) 
-> T:
     return visitor.visit_less_than_or_equal(term=expr.term, 
literal=expr.literal)
+
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+IN_PREDICATE_LIMIT = 200
+
+
+def all_values_are_null(partition_field: PartitionFieldSummary, field_type: 
IcebergType) -> bool:
+    # containsNull encodes whether at least one partition value is null,
+    # lowerBound is null if all partition values are null
+    all_null = partition_field.contains_null is True and 
partition_field.lower_bound is None
+
+    if all_null and (field_type is DoubleType or field_type is FloatType):
+        # floating point types may include NaN values, which we check 
separately.
+        # In case bounds don't include NaN value, containsNaN needs to be 
checked against.
+        return partition_field.contains_nan is False
+
+    return all_null
+
+
+def _from_byte_buffer(field_type: IcebergType, val: bytes):
+    if not isinstance(field_type, PrimitiveType):
+        raise ValueError(f"Expected a PrimitiveType, got: {type(field_type)}")
+    return from_bytes(field_type, val)
+
+
+class ManifestEvaluator(BoundBooleanExpressionVisitor[bool]):
+    partition_filter: BooleanExpression
+    partition_fields: list[PartitionFieldSummary]
+
+    def __init__(self, partition_filter: BooleanExpression):
+        self.partition_filter = partition_filter
+
+    def eval(self, manifest: ManifestFile) -> bool:
+        if partitions := manifest.partitions:
+            # this is not what we want

Review Comment:
   It a good that I've added that comment. I don't really like setting a class 
variable over there. I've split it out into a separate class.



##########
python/pyiceberg/expressions/base.py:
##########
@@ -835,3 +843,204 @@ def _(expr: BoundLessThan, visitor: 
BoundBooleanExpressionVisitor[T]) -> T:
 @visit_bound_predicate.register(BoundLessThanOrEqual)
 def _(expr: BoundLessThanOrEqual, visitor: BoundBooleanExpressionVisitor[T]) 
-> T:
     return visitor.visit_less_than_or_equal(term=expr.term, 
literal=expr.literal)
+
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+IN_PREDICATE_LIMIT = 200
+
+
+def all_values_are_null(partition_field: PartitionFieldSummary, field_type: 
IcebergType) -> bool:
+    # containsNull encodes whether at least one partition value is null,
+    # lowerBound is null if all partition values are null
+    all_null = partition_field.contains_null is True and 
partition_field.lower_bound is None
+
+    if all_null and (field_type is DoubleType or field_type is FloatType):
+        # floating point types may include NaN values, which we check 
separately.
+        # In case bounds don't include NaN value, containsNaN needs to be 
checked against.
+        return partition_field.contains_nan is False
+
+    return all_null
+
+
+def _from_byte_buffer(field_type: IcebergType, val: bytes):
+    if not isinstance(field_type, PrimitiveType):
+        raise ValueError(f"Expected a PrimitiveType, got: {type(field_type)}")
+    return from_bytes(field_type, val)
+
+
+class ManifestEvaluator(BoundBooleanExpressionVisitor[bool]):
+    partition_filter: BooleanExpression
+    partition_fields: list[PartitionFieldSummary]
+
+    def __init__(self, partition_filter: BooleanExpression):
+        self.partition_filter = partition_filter
+
+    def eval(self, manifest: ManifestFile) -> bool:
+        if partitions := manifest.partitions:
+            # this is not what we want
+            self.partition_fields = partitions
+            return visit(self.partition_filter, self)
+
+        # No partition information
+        return ROWS_MIGHT_MATCH
+
+    def visit_in(self, term: BoundTerm, literals: set[Literal[Any]]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        if len(literals) > IN_PREDICATE_LIMIT:
+            return ROWS_MIGHT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, 
field.lower_bound)
+
+        if all([lower < val.value for val in literals]):
+            return ROWS_CANNOT_MATCH
+
+        if field.upper_bound is not None:
+            upper = _from_byte_buffer(term.ref().field.field_type, 
field.upper_bound)
+            if all([upper > val.value for val in literals]):
+                return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_in(self, term: BoundTerm, literals: set[Literal[Any]]) -> 
bool:
+        # because the bounds are not necessarily a min or max value, this 
cannot be answered using
+        # them. notIn(col, {X, ...}) with (X, Y) doesn't guarantee that X is a 
value in col.
+        return ROWS_MIGHT_MATCH
+
+    def visit_is_nan(self, term: BoundTerm) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if self.partition_fields[pos].contains_nan is False:
+            return ROWS_CANNOT_MATCH
+
+        if all_values_are_null(field, term.ref().field.field_type):
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_nan(self, term: BoundTerm) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.contains_nan is True and field.contains_null is False and 
field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_is_null(self, term: BoundTerm) -> bool:
+        pos = term.ref().accessor.position
+
+        if self.partition_fields[pos].contains_null is False:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_null(self, term: BoundTerm) -> bool:
+        pos = term.ref().accessor.position
+
+        if all_values_are_null(self.partition_fields[pos], 
term.ref().field.field_type):
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_equal(self, term: BoundTerm, literal: Literal[Any]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            # values are all null and literal cannot contain null
+            return ROWS_CANNOT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, 
field.lower_bound)
+
+        if lower > literal.value:
+            return ROWS_CANNOT_MATCH
+
+        upper = _from_byte_buffer(term.ref().field.field_type, 
field.lower_bound)
+
+        if literal.value > upper:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_equal(self, term: BoundTerm, literal: Literal[Any]) -> bool:
+        # because the bounds are not necessarily a min or max value, this 
cannot be answered using
+        # them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value 
in col.
+        return ROWS_MIGHT_MATCH
+
+    def visit_greater_than_or_equal(self, term: BoundTerm, literal: 
Literal[Any]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.upper_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        upper = _from_byte_buffer(term.ref().field.field_type, 
field.upper_bound)
+
+        if literal.value > upper:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_greater_than(self, term: BoundTerm, literal: Literal[Any]) -> 
bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.upper_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        upper = _from_byte_buffer(term.ref().field.field_type, 
field.upper_bound)
+
+        if literal.value >= upper:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_less_than(self, term: BoundTerm, literal: Literal[Any]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, 
field.lower_bound)
+
+        if literal.value <= lower:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_less_than_or_equal(self, term: BoundTerm, literal: Literal[Any]) 
-> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, 
field.lower_bound)
+
+        if literal.value < lower:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_true(self) -> bool:
+        return True

Review Comment:
   Nice, thanks!



##########
python/pyiceberg/expressions/base.py:
##########
@@ -835,3 +843,204 @@ def _(expr: BoundLessThan, visitor: 
BoundBooleanExpressionVisitor[T]) -> T:
 @visit_bound_predicate.register(BoundLessThanOrEqual)
 def _(expr: BoundLessThanOrEqual, visitor: BoundBooleanExpressionVisitor[T]) 
-> T:
     return visitor.visit_less_than_or_equal(term=expr.term, 
literal=expr.literal)
+
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+IN_PREDICATE_LIMIT = 200
+
+
+def all_values_are_null(partition_field: PartitionFieldSummary, field_type: 
IcebergType) -> bool:
+    # containsNull encodes whether at least one partition value is null,
+    # lowerBound is null if all partition values are null
+    all_null = partition_field.contains_null is True and 
partition_field.lower_bound is None
+
+    if all_null and (field_type is DoubleType or field_type is FloatType):
+        # floating point types may include NaN values, which we check 
separately.
+        # In case bounds don't include NaN value, containsNaN needs to be 
checked against.
+        return partition_field.contains_nan is False
+
+    return all_null
+
+
+def _from_byte_buffer(field_type: IcebergType, val: bytes):
+    if not isinstance(field_type, PrimitiveType):
+        raise ValueError(f"Expected a PrimitiveType, got: {type(field_type)}")
+    return from_bytes(field_type, val)
+
+
+class ManifestEvaluator(BoundBooleanExpressionVisitor[bool]):
+    partition_filter: BooleanExpression
+    partition_fields: list[PartitionFieldSummary]
+
+    def __init__(self, partition_filter: BooleanExpression):
+        self.partition_filter = partition_filter
+
+    def eval(self, manifest: ManifestFile) -> bool:
+        if partitions := manifest.partitions:
+            # this is not what we want
+            self.partition_fields = partitions
+            return visit(self.partition_filter, self)
+
+        # No partition information
+        return ROWS_MIGHT_MATCH
+
+    def visit_in(self, term: BoundTerm, literals: set[Literal[Any]]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        if len(literals) > IN_PREDICATE_LIMIT:
+            return ROWS_MIGHT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, 
field.lower_bound)
+
+        if all([lower < val.value for val in literals]):
+            return ROWS_CANNOT_MATCH
+
+        if field.upper_bound is not None:
+            upper = _from_byte_buffer(term.ref().field.field_type, 
field.upper_bound)
+            if all([upper > val.value for val in literals]):
+                return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_in(self, term: BoundTerm, literals: set[Literal[Any]]) -> 
bool:
+        # because the bounds are not necessarily a min or max value, this 
cannot be answered using
+        # them. notIn(col, {X, ...}) with (X, Y) doesn't guarantee that X is a 
value in col.
+        return ROWS_MIGHT_MATCH
+
+    def visit_is_nan(self, term: BoundTerm) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if self.partition_fields[pos].contains_nan is False:

Review Comment:
   It should, thanks!



##########
python/pyiceberg/expressions/base.py:
##########
@@ -835,3 +843,204 @@ def _(expr: BoundLessThan, visitor: 
BoundBooleanExpressionVisitor[T]) -> T:
 @visit_bound_predicate.register(BoundLessThanOrEqual)
 def _(expr: BoundLessThanOrEqual, visitor: BoundBooleanExpressionVisitor[T]) 
-> T:
     return visitor.visit_less_than_or_equal(term=expr.term, 
literal=expr.literal)
+
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+IN_PREDICATE_LIMIT = 200
+
+
+def all_values_are_null(partition_field: PartitionFieldSummary, field_type: 
IcebergType) -> bool:

Review Comment:
   Good call



##########
python/pyiceberg/expressions/base.py:
##########
@@ -835,3 +843,204 @@ def _(expr: BoundLessThan, visitor: 
BoundBooleanExpressionVisitor[T]) -> T:
 @visit_bound_predicate.register(BoundLessThanOrEqual)
 def _(expr: BoundLessThanOrEqual, visitor: BoundBooleanExpressionVisitor[T]) 
-> T:
     return visitor.visit_less_than_or_equal(term=expr.term, 
literal=expr.literal)
+
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+IN_PREDICATE_LIMIT = 200
+
+
+def all_values_are_null(partition_field: PartitionFieldSummary, field_type: 
IcebergType) -> bool:
+    # containsNull encodes whether at least one partition value is null,
+    # lowerBound is null if all partition values are null
+    all_null = partition_field.contains_null is True and 
partition_field.lower_bound is None
+
+    if all_null and (field_type is DoubleType or field_type is FloatType):
+        # floating point types may include NaN values, which we check 
separately.
+        # In case bounds don't include NaN value, containsNaN needs to be 
checked against.
+        return partition_field.contains_nan is False
+
+    return all_null
+
+
+def _from_byte_buffer(field_type: IcebergType, val: bytes):
+    if not isinstance(field_type, PrimitiveType):
+        raise ValueError(f"Expected a PrimitiveType, got: {type(field_type)}")
+    return from_bytes(field_type, val)
+
+
+class ManifestEvaluator(BoundBooleanExpressionVisitor[bool]):
+    partition_filter: BooleanExpression
+    partition_fields: list[PartitionFieldSummary]
+
+    def __init__(self, partition_filter: BooleanExpression):
+        self.partition_filter = partition_filter
+
+    def eval(self, manifest: ManifestFile) -> bool:
+        if partitions := manifest.partitions:
+            # this is not what we want
+            self.partition_fields = partitions
+            return visit(self.partition_filter, self)
+
+        # No partition information
+        return ROWS_MIGHT_MATCH
+
+    def visit_in(self, term: BoundTerm, literals: set[Literal[Any]]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        if len(literals) > IN_PREDICATE_LIMIT:
+            return ROWS_MIGHT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, 
field.lower_bound)
+
+        if all([lower < val.value for val in literals]):

Review Comment:
   Ahh, great catch! It is flipped indeed, thanks! A test has been added.



##########
python/pyiceberg/expressions/base.py:
##########
@@ -835,3 +843,204 @@ def _(expr: BoundLessThan, visitor: 
BoundBooleanExpressionVisitor[T]) -> T:
 @visit_bound_predicate.register(BoundLessThanOrEqual)
 def _(expr: BoundLessThanOrEqual, visitor: BoundBooleanExpressionVisitor[T]) 
-> T:
     return visitor.visit_less_than_or_equal(term=expr.term, 
literal=expr.literal)
+
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+IN_PREDICATE_LIMIT = 200
+
+
+def all_values_are_null(partition_field: PartitionFieldSummary, field_type: 
IcebergType) -> bool:
+    # containsNull encodes whether at least one partition value is null,
+    # lowerBound is null if all partition values are null
+    all_null = partition_field.contains_null is True and 
partition_field.lower_bound is None
+
+    if all_null and (field_type is DoubleType or field_type is FloatType):
+        # floating point types may include NaN values, which we check 
separately.
+        # In case bounds don't include NaN value, containsNaN needs to be 
checked against.
+        return partition_field.contains_nan is False
+
+    return all_null
+
+
+def _from_byte_buffer(field_type: IcebergType, val: bytes):
+    if not isinstance(field_type, PrimitiveType):
+        raise ValueError(f"Expected a PrimitiveType, got: {type(field_type)}")
+    return from_bytes(field_type, val)
+
+
+class ManifestEvaluator(BoundBooleanExpressionVisitor[bool]):
+    partition_filter: BooleanExpression
+    partition_fields: list[PartitionFieldSummary]
+
+    def __init__(self, partition_filter: BooleanExpression):
+        self.partition_filter = partition_filter
+
+    def eval(self, manifest: ManifestFile) -> bool:
+        if partitions := manifest.partitions:
+            # this is not what we want
+            self.partition_fields = partitions
+            return visit(self.partition_filter, self)
+
+        # No partition information
+        return ROWS_MIGHT_MATCH
+
+    def visit_in(self, term: BoundTerm, literals: set[Literal[Any]]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        if len(literals) > IN_PREDICATE_LIMIT:
+            return ROWS_MIGHT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, 
field.lower_bound)
+
+        if all([lower < val.value for val in literals]):

Review Comment:
   I implemented this using the `all` method since it will stop the loop early 
when a `False` has been encountered.



##########
python/pyiceberg/expressions/base.py:
##########
@@ -835,3 +843,204 @@ def _(expr: BoundLessThan, visitor: 
BoundBooleanExpressionVisitor[T]) -> T:
 @visit_bound_predicate.register(BoundLessThanOrEqual)
 def _(expr: BoundLessThanOrEqual, visitor: BoundBooleanExpressionVisitor[T]) 
-> T:
     return visitor.visit_less_than_or_equal(term=expr.term, 
literal=expr.literal)
+
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+IN_PREDICATE_LIMIT = 200
+
+
+def all_values_are_null(partition_field: PartitionFieldSummary, field_type: 
IcebergType) -> bool:
+    # containsNull encodes whether at least one partition value is null,
+    # lowerBound is null if all partition values are null
+    all_null = partition_field.contains_null is True and 
partition_field.lower_bound is None
+
+    if all_null and (field_type is DoubleType or field_type is FloatType):
+        # floating point types may include NaN values, which we check 
separately.
+        # In case bounds don't include NaN value, containsNaN needs to be 
checked against.
+        return partition_field.contains_nan is False
+
+    return all_null
+
+
+def _from_byte_buffer(field_type: IcebergType, val: bytes):
+    if not isinstance(field_type, PrimitiveType):
+        raise ValueError(f"Expected a PrimitiveType, got: {type(field_type)}")
+    return from_bytes(field_type, val)
+
+
+class ManifestEvaluator(BoundBooleanExpressionVisitor[bool]):
+    partition_filter: BooleanExpression
+    partition_fields: list[PartitionFieldSummary]
+
+    def __init__(self, partition_filter: BooleanExpression):
+        self.partition_filter = partition_filter
+
+    def eval(self, manifest: ManifestFile) -> bool:
+        if partitions := manifest.partitions:
+            # this is not what we want
+            self.partition_fields = partitions
+            return visit(self.partition_filter, self)
+
+        # No partition information
+        return ROWS_MIGHT_MATCH
+
+    def visit_in(self, term: BoundTerm, literals: set[Literal[Any]]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        if len(literals) > IN_PREDICATE_LIMIT:
+            return ROWS_MIGHT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, 
field.lower_bound)
+
+        if all([lower < val.value for val in literals]):
+            return ROWS_CANNOT_MATCH
+
+        if field.upper_bound is not None:
+            upper = _from_byte_buffer(term.ref().field.field_type, 
field.upper_bound)
+            if all([upper > val.value for val in literals]):

Review Comment:
   At least it is consistently reversed, thanks! Updated and added a test 👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to