smaheshwar-pltr commented on code in PR #2234:
URL: https://github.com/apache/iceberg-python/pull/2234#discussion_r2222504778
##########
pyiceberg/table/__init__.py:
##########
@@ -1900,76 +1965,194 @@ def plan_files(self) -> Iterable[FileScanTask]:
if not snapshot:
return iter([])
- # step 1: filter manifests using partition summaries
- # the filter depends on the partition spec used to write the manifest
file, so create a cache of filters for each spec id
-
- manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] =
KeyDefaultDict(self._build_manifest_evaluator)
+ return
self._manifest_planner.plan_files(manifests=snapshot.manifests(self.io))
- residual_evaluators: Dict[int, Callable[[DataFile],
ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator)
+ def to_arrow(self) -> pa.Table:
+ """Read an Arrow table eagerly from this DataScan.
- manifests = [
- manifest_file
- for manifest_file in snapshot.manifests(self.io)
- if
manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
- ]
+ All rows will be loaded into memory at once.
- # step 2: filter the data files in each manifest
- # this filter depends on the partition spec used to write the manifest
file
+ Returns:
+ pa.Table: Materialized Arrow Table from the Iceberg table's
DataScan
+ """
+ from pyiceberg.io.pyarrow import ArrowScan
- partition_evaluators: Dict[int, Callable[[DataFile], bool]] =
KeyDefaultDict(self._build_partition_evaluator)
+ return ArrowScan(
+ self.table_metadata, self.io, self.projection(), self.row_filter,
self.case_sensitive, self.limit
+ ).to_table(self.plan_files())
- min_sequence_number = _min_sequence_number(manifests)
+ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
+ """Return an Arrow RecordBatchReader from this DataScan.
- data_entries: List[ManifestEntry] = []
- positional_delete_entries = SortedList(key=lambda entry:
entry.sequence_number or INITIAL_SEQUENCE_NUMBER)
+ For large results, using a RecordBatchReader requires less memory than
+ loading an Arrow Table for the same DataScan, because a RecordBatch
+ is read one at a time.
- executor = ExecutorFactory.get_or_create()
- for manifest_entry in chain(
- *executor.map(
- lambda args: _open_manifest(*args),
- [
- (
- self.io,
- manifest,
- partition_evaluators[manifest.partition_spec_id],
- self._build_metrics_evaluator(),
- )
- for manifest in manifests
- if self._check_sequence_number(min_sequence_number,
manifest)
- ],
- )
- ):
- data_file = manifest_entry.data_file
- if data_file.content == DataFileContent.DATA:
- data_entries.append(manifest_entry)
- elif data_file.content == DataFileContent.POSITION_DELETES:
- positional_delete_entries.add(manifest_entry)
- elif data_file.content == DataFileContent.EQUALITY_DELETES:
- raise ValueError("PyIceberg does not yet support equality
deletes: https://github.com/apache/iceberg/issues/6568")
+ Returns:
+ pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg
table's DataScan
+ which can be used to read a stream of record batches one by
one.
+ """
+ import pyarrow as pa
+
+ from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
+
+ target_schema = schema_to_pyarrow(self.projection())
+ batches = ArrowScan(
+ self.table_metadata, self.io, self.projection(), self.row_filter,
self.case_sensitive, self.limit
+ ).to_record_batches(self.plan_files())
+
+ return pa.RecordBatchReader.from_batches(
+ target_schema,
+ batches,
+ ).cast(target_schema)
+
+ def count(self) -> int:
+ from pyiceberg.io.pyarrow import ArrowScan
+
+ # Usage: Calculates the total number of records in a Scan that haven't
had positional deletes.
+ res = 0
+ # every task is a FileScanTask
+ tasks = self.plan_files()
+
+ for task in tasks:
+ # task.residual is a Boolean Expression if the filter condition is
fully satisfied by the
+ # partition value and task.delete_files represents that positional
delete haven't been merged yet
+ # hence those files have to read as a pyarrow table applying the
filter and deletes
+ if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
+ # Every File has a metadata stat that stores the file record
count
+ res += task.file.record_count
else:
- raise ValueError(f"Unknown DataFileContent
({data_file.content}): {manifest_entry}")
+ arrow_scan = ArrowScan(
+ table_metadata=self.table_metadata,
+ io=self.io,
+ projected_schema=self.projection(),
+ row_filter=self.row_filter,
+ case_sensitive=self.case_sensitive,
+ )
+ tbl = arrow_scan.to_table([task])
+ res += len(tbl)
+ return res
- return [
- FileScanTask(
- data_entry.data_file,
- delete_files=_match_deletes_to_data_file(
- data_entry,
- positional_delete_entries,
- ),
-
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
- data_entry.data_file.partition
- ),
- )
- for data_entry in data_entries
- ]
+
+class IncrementalAppendScan(AbstractTableScan):
+ """An incremental scan of a table's data that accumulates appended data
between two snapshots.
+
+ Args:
+ row_filter:
+ A string or BooleanExpression that describes the
+ desired rows
+ selected_fields:
+ A tuple of strings representing the column names
+ to return in the output dataframe.
+ case_sensitive:
+ If True column matching is case sensitive
+ options:
+ Additional Table properties as a dictionary of
+ string key value pairs to use for this scan.
+ limit:
+ An integer representing the number of rows to
+ return in the scan result. If None, fetches all
+ matching rows.
+ from_snapshot_id_exclusive:
+ Optional ID of the "from" snapshot, to start the incremental scan
from, exclusively. When the scan is
+ ultimately planned, this must not be None.
+ to_snapshot_id_inclusive:
+ Optional ID of the "to" snapshot, to end the incremental scan at,
inclusively.
+ Omitting it will default to the table's current snapshot.
+ """
+
+ from_snapshot_id_exclusive: Optional[int]
+ to_snapshot_id_inclusive: Optional[int]
+
+ def __init__(
+ self,
+ table_metadata: TableMetadata,
+ io: FileIO,
+ row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
+ selected_fields: Tuple[str, ...] = ("*",),
+ case_sensitive: bool = True,
+ options: Properties = EMPTY_DICT,
+ limit: Optional[int] = None,
+ from_snapshot_id_exclusive: Optional[int] = None,
+ to_snapshot_id_inclusive: Optional[int] = None,
+ ):
+ super().__init__(
+ table_metadata,
+ io,
+ row_filter,
+ selected_fields,
+ case_sensitive,
+ options,
+ limit,
+ )
+ self.from_snapshot_id_exclusive = from_snapshot_id_exclusive
+ self.to_snapshot_id_inclusive = to_snapshot_id_inclusive
+
+ def from_snapshot_exclusive(self: A, from_snapshot_id_exclusive:
Optional[int]) -> A:
+ """Instructs this scan to look for changes starting from a particular
snapshot (exclusive).
+
+ Args:
+ from_snapshot_id_exclusive: the start snapshot ID (exclusive)
+
+ Returns:
+ this for method chaining
+ """
+ return
self.update(from_snapshot_id_exclusive=from_snapshot_id_exclusive)
+
+ def to_snapshot_inclusive(self: A, to_snapshot_id_inclusive:
Optional[int]) -> A:
+ """Instructs this scan to look for changes up to a particular snapshot
(inclusive).
+
+ Args:
+ to_snapshot_id_inclusive: the end snapshot ID (inclusive)
+
+ Returns:
+ this for method chaining
+ """
+ return self.update(to_snapshot_id_inclusive=to_snapshot_id_inclusive)
+
+ def projection(self) -> Schema:
+ current_schema = self.table_metadata.schema()
+
+ if "*" in self.selected_fields:
+ return current_schema
+
+ return current_schema.select(*self.selected_fields,
case_sensitive=self.case_sensitive)
+
+ def plan_files(self) -> Iterable[FileScanTask]:
+ from_snapshot_id, to_snapshot_id =
self._validate_and_resolve_snapshots()
+
+ append_snapshots: List[Snapshot] =
self._appends_between(from_snapshot_id, to_snapshot_id, self.table_metadata)
+ if len(append_snapshots) == 0:
+ return iter([])
+
+ append_snapshot_ids: Set[int] = {snapshot.snapshot_id for snapshot in
append_snapshots}
+
+ manifests = {
Review Comment:
https://github.com/apache/iceberg-python/pull/2031#discussion_r2102624828
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]