This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new d26d1e48 Avoid reusing shared metrics evaluator (#1664)
d26d1e48 is described below

commit d26d1e48f48e42e4fae8f8a2dccecb7dbb009840
Author: rachel88888 <[email protected]>
AuthorDate: Tue Feb 18 08:04:04 2025 -0500

    Avoid reusing shared metrics evaluator (#1664)
    
    Hello!
    
    I have noticed the same issue as Issue #1506 where the number of results
    retrieved is inconsistent across reads and traced the issue to the reuse
    of the same metrics evaluator across threads when reading manifests.
    Because the metrics evaluator is stateful, this will result in the wrong
    results being retrieved nondeterministically, depending on the execution
    order of the threads.
    
    This PR addresses the issue by creating a single metrics evaluator per
    thread, which I have tested locally. Please let me know if there are any
    tests I can add, and I am happy to receive feedback.
    
    Thank you!
    
    Closes #1506
---
 pyiceberg/table/__init__.py | 23 +++++++++++++++--------
 1 file changed, 15 insertions(+), 8 deletions(-)

diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index a409cd0e..8ff299ce 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -1608,6 +1608,20 @@ class DataScan(TableScan):
         # shared instance across multiple threads.
         return lambda data_file: expression_evaluator(partition_schema, 
partition_expr, self.case_sensitive)(data_file.partition)
 
+    def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
+        schema = self.table_metadata.schema()
+        include_empty_files = 
strtobool(self.options.get("include_empty_files", "false"))
+
+        # The lambda created here is run in multiple threads.
+        # So we avoid creating _InclusiveMetricsEvaluator methods bound to a 
single
+        # shared instance across multiple threads.
+        return lambda data_file: _InclusiveMetricsEvaluator(
+            schema,
+            self.row_filter,
+            self.case_sensitive,
+            include_empty_files,
+        ).eval(data_file)
+
     def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], 
ResidualEvaluator]:
         spec = self.table_metadata.specs()[spec_id]
 
@@ -1671,13 +1685,6 @@ class DataScan(TableScan):
 
         partition_evaluators: Dict[int, Callable[[DataFile], bool]] = 
KeyDefaultDict(self._build_partition_evaluator)
 
-        metrics_evaluator = _InclusiveMetricsEvaluator(
-            self.table_metadata.schema(),
-            self.row_filter,
-            self.case_sensitive,
-            strtobool(self.options.get("include_empty_files", "false")),
-        ).eval
-
         min_sequence_number = _min_sequence_number(manifests)
 
         data_entries: List[ManifestEntry] = []
@@ -1693,7 +1700,7 @@ class DataScan(TableScan):
                         manifest,
                         partition_evaluators[manifest.partition_spec_id],
                         residual_evaluators[manifest.partition_spec_id],
-                        metrics_evaluator,
+                        self._build_metrics_evaluator(),
                     )
                     for manifest in manifests
                     if self._check_sequence_number(min_sequence_number, 
manifest)

Reply via email to