kevinjqliu commented on code in PR #3511:
URL: https://github.com/apache/iceberg-python/pull/3511#discussion_r3447163440


##########
pyiceberg/table/__init__.py:
##########
@@ -2027,74 +2099,19 @@ def _min_sequence_number(manifests: list[ManifestFile]) 
-> int:
 
 
 class DataScan(TableScan):
-    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
-        project = inclusive_projection(self.table_metadata.schema(), 
self.table_metadata.specs()[spec_id], self.case_sensitive)
-        return project(self.row_filter)
-
     @cached_property
-    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
-        return KeyDefaultDict(self._build_partition_projection)
-
-    def _build_manifest_evaluator(self, spec_id: int) -> 
Callable[[ManifestFile], bool]:
-        spec = self.table_metadata.specs()[spec_id]
-        return manifest_evaluator(spec, self.table_metadata.schema(), 
self.partition_filters[spec_id], self.case_sensitive)
-
-    def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], 
bool]:
-        spec = self.table_metadata.specs()[spec_id]
-        partition_type = spec.partition_type(self.table_metadata.schema())
-        partition_schema = Schema(*partition_type.fields)
-        partition_expr = self.partition_filters[spec_id]
-
-        # The lambda created here is run in multiple threads.
-        # So we avoid creating _EvaluatorExpression methods bound to a single
-        # shared instance across multiple threads.
-        return lambda data_file: expression_evaluator(partition_schema, 
partition_expr, self.case_sensitive)(data_file.partition)
-
-    def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
-        schema = self.table_metadata.schema()
-        include_empty_files = 
strtobool(self.options.get("include_empty_files", "false"))
-
-        # The lambda created here is run in multiple threads.
-        # So we avoid creating _InclusiveMetricsEvaluator methods bound to a 
single
-        # shared instance across multiple threads.
-        return lambda data_file: _InclusiveMetricsEvaluator(
-            schema,
-            self.row_filter,
-            self.case_sensitive,
-            include_empty_files,
-        ).eval(data_file)
-
-    def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], 
ResidualEvaluator]:
-        spec = self.table_metadata.specs()[spec_id]
-
-        from pyiceberg.expressions.visitors import residual_evaluator_of
-
-        # The lambda created here is run in multiple threads.
-        # So we avoid creating _EvaluatorExpression methods bound to a single
-        # shared instance across multiple threads.
-        return lambda datafile: residual_evaluator_of(
-            spec=spec,
-            expr=self.row_filter,
+    def _manifest_planner(self) -> ManifestGroupPlanner:
+        return ManifestGroupPlanner(
+            table_metadata=self.table_metadata,
+            io=self.io,
+            row_filter=self.row_filter,
             case_sensitive=self.case_sensitive,
-            schema=self.table_metadata.schema(),
+            options=self.options,
         )
 
-    @staticmethod
-    def _check_sequence_number(min_sequence_number: int, manifest: 
ManifestFile) -> bool:
-        """Ensure that no manifests are loaded that contain deletes that are 
older than the data.
-
-        Args:
-            min_sequence_number (int): The minimal sequence number.
-            manifest (ManifestFile): A ManifestFile that can be either data or 
deletes.
-
-        Returns:
-            Boolean indicating if it is either a data file, or a relevant 
delete file.
-        """
-        return manifest.content == ManifestContent.DATA or (
-            # Not interested in deletes that are older than the data
-            manifest.content == ManifestContent.DELETES
-            and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= 
min_sequence_number
-        )
+    @property

Review Comment:
   ```suggestion
       @cached_property
   ```
   
   lets keep the cached_property on this for compatibility



##########
pyiceberg/table/__init__.py:
##########
@@ -2027,74 +2099,19 @@ def _min_sequence_number(manifests: list[ManifestFile]) 
-> int:
 
 
 class DataScan(TableScan):
-    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
-        project = inclusive_projection(self.table_metadata.schema(), 
self.table_metadata.specs()[spec_id], self.case_sensitive)
-        return project(self.row_filter)
-
     @cached_property
-    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
-        return KeyDefaultDict(self._build_partition_projection)
-
-    def _build_manifest_evaluator(self, spec_id: int) -> 
Callable[[ManifestFile], bool]:
-        spec = self.table_metadata.specs()[spec_id]
-        return manifest_evaluator(spec, self.table_metadata.schema(), 
self.partition_filters[spec_id], self.case_sensitive)
-
-    def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], 
bool]:
-        spec = self.table_metadata.specs()[spec_id]
-        partition_type = spec.partition_type(self.table_metadata.schema())
-        partition_schema = Schema(*partition_type.fields)
-        partition_expr = self.partition_filters[spec_id]
-
-        # The lambda created here is run in multiple threads.
-        # So we avoid creating _EvaluatorExpression methods bound to a single
-        # shared instance across multiple threads.
-        return lambda data_file: expression_evaluator(partition_schema, 
partition_expr, self.case_sensitive)(data_file.partition)
-
-    def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
-        schema = self.table_metadata.schema()
-        include_empty_files = 
strtobool(self.options.get("include_empty_files", "false"))
-
-        # The lambda created here is run in multiple threads.
-        # So we avoid creating _InclusiveMetricsEvaluator methods bound to a 
single
-        # shared instance across multiple threads.
-        return lambda data_file: _InclusiveMetricsEvaluator(
-            schema,
-            self.row_filter,
-            self.case_sensitive,
-            include_empty_files,
-        ).eval(data_file)
-
-    def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], 
ResidualEvaluator]:
-        spec = self.table_metadata.specs()[spec_id]
-
-        from pyiceberg.expressions.visitors import residual_evaluator_of
-
-        # The lambda created here is run in multiple threads.
-        # So we avoid creating _EvaluatorExpression methods bound to a single
-        # shared instance across multiple threads.
-        return lambda datafile: residual_evaluator_of(
-            spec=spec,
-            expr=self.row_filter,
+    def _manifest_planner(self) -> ManifestGroupPlanner:
+        return ManifestGroupPlanner(
+            table_metadata=self.table_metadata,
+            io=self.io,
+            row_filter=self.row_filter,
             case_sensitive=self.case_sensitive,
-            schema=self.table_metadata.schema(),
+            options=self.options,
         )
 
-    @staticmethod
-    def _check_sequence_number(min_sequence_number: int, manifest: 
ManifestFile) -> bool:
-        """Ensure that no manifests are loaded that contain deletes that are 
older than the data.
-
-        Args:
-            min_sequence_number (int): The minimal sequence number.
-            manifest (ManifestFile): A ManifestFile that can be either data or 
deletes.
-
-        Returns:
-            Boolean indicating if it is either a data file, or a relevant 
delete file.
-        """
-        return manifest.content == ManifestContent.DATA or (
-            # Not interested in deletes that are older than the data
-            manifest.content == ManifestContent.DELETES
-            and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= 
min_sequence_number
-        )
+    @property
+    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
+        return self._manifest_planner.partition_filters
 
     def scan_plan_helper(self) -> Iterator[list[ManifestEntry]]:

Review Comment:
   `scan_plan_helper` should be internal. it was introduced in 
https://github.com/apache/iceberg-python/commit/d99936a6aa1758577c27532eb4f91bd15053ce92#diff-23e8153e0fd497a9212215bd2067068f3b56fa071770c7ef326db3d3d03cee9bR1946
   
   Previously, the local planning path was:
   ```
   plan_files()
     -> _plan_files_local()
     -> self.scan_plan_helper()
   ```
   
   With this refactor, local planning now goes through the extracted planner 
directly:
   ```
   plan_files()
     -> _plan_files_local()
     -> self._manifest_planner.plan_files(...)
   ```
   
   Since scan_plan_helper is an internal helper rather than a public extension 
point, this change seems fine. We should follow up and append an `_` to the 
function name



##########
pyiceberg/table/__init__.py:
##########
@@ -1817,20 +1816,127 @@ def __init__(
         row_filter: str | BooleanExpression = ALWAYS_TRUE,
         selected_fields: tuple[str, ...] = ("*",),
         case_sensitive: bool = True,
-        snapshot_id: int | None = None,
         options: Properties = EMPTY_DICT,
         limit: int | None = None,
-        catalog: Catalog | None = None,
-        table_identifier: Identifier | None = None,
     ):
         self.table_metadata = table_metadata
         self.io = io
         self.row_filter = _parse_row_filter(row_filter)
         self.selected_fields = selected_fields
         self.case_sensitive = case_sensitive
-        self.snapshot_id = snapshot_id
         self.options = options
         self.limit = limit
+
+    @abstractmethod
+    def projection(self) -> Schema: ...
+

Review Comment:
   ```suggestion
   ```
   
   nit: this is net new
   
   previously was not an `abstractmethod`
   
https://github.com/apache/iceberg-python/blob/6da06adfa82eda8d647060632115e75a35634b87/pyiceberg/table/__init__.py#L1842
   
   i think we can remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to