syun64 commented on code in PR #931:
URL: https://github.com/apache/iceberg-python/pull/931#discussion_r1678319796
##########
pyiceberg/table/__init__.py:
##########
@@ -502,6 +503,71 @@ def append(self, df: pa.Table, snapshot_properties:
Dict[str, str] = EMPTY_DICT)
for data_file in data_files:
append_files.append_data_file(data_file)
+ def _build_partition_predicate(self, spec_id: int, delete_partitions:
List[Record]) -> BooleanExpression:
+ partition_spec = self.table_metadata.spec()
+ schema = self.table_metadata.schema()
+ partition_fields = [schema.find_field(field.source_id).name for field
in partition_spec.fields]
+
+ expr: BooleanExpression = AlwaysFalse()
+ for partition_record in delete_partitions:
+ match_partition_expression: BooleanExpression = AlwaysTrue()
+
+ for pos in range(len(partition_fields)):
+ predicate = (
+ EqualTo(Reference(partition_fields[pos]),
partition_record[pos])
+ if partition_record[pos] is not None
+ else IsNull(Reference(partition_fields[pos]))
+ )
+ match_partition_expression = And(match_partition_expression,
predicate)
+ expr = Or(expr, match_partition_expression)
+ return expr
+
+ def dynamic_overwrite(self, df: pa.Table, snapshot_properties: Dict[str,
str] = EMPTY_DICT) -> None:
+ """
+ Shorthand for adding a table dynamic overwrite with a PyArrow table to
the transaction.
+
+ Args:
+ df: The Arrow dataframe that will be used to overwrite the table
+ snapshot_properties: Custom properties to be added to the snapshot
summary
+ """
+ try:
+ import pyarrow as pa
+ except ModuleNotFoundError as e:
+ raise ModuleNotFoundError("For writes PyArrow needs to be
installed") from e
+
+ if not isinstance(df, pa.Table):
+ raise ValueError(f"Expected PyArrow table, got: {df}")
+
+ _check_schema_compatible(self._table.schema(), other_schema=df.schema)
+
+ # cast if the two schemas are compatible but not equal
+ table_arrow_schema = self._table.schema().as_arrow()
+ if table_arrow_schema != df.schema:
+ df = df.cast(table_arrow_schema)
+
+ # If dataframe does not have data, there is no need to overwrite
+ if df.shape[0] == 0:
+ return
+
+ append_snapshot_commit_uuid = uuid.uuid4()
+ data_files: List[DataFile] = list(
+ _dataframe_to_data_files(
+ table_metadata=self._table.metadata,
write_uuid=append_snapshot_commit_uuid, df=df, io=self._table.io
+ )
+ )
+ with
self.update_snapshot(snapshot_properties=snapshot_properties).delete() as
delete_snapshot:
+ delete_partitions = [data_file.partition for data_file in
data_files]
+ delete_filter = self._build_partition_predicate(
+ spec_id=self.table_metadata.spec().spec_id,
delete_partitions=delete_partitions
+ )
+ delete_snapshot.delete_by_predicate(delete_filter)
+
+ with
self.update_snapshot(snapshot_properties=snapshot_properties).fast_append(
+ append_snapshot_commit_uuid
+ ) as append_snapshot:
Review Comment:
What are your thoughts on using `merge_append` when `MANIFEST_MERGE_ENABLED`
property is set to true? In the Java implementation append, overwrite and
replacePartitions (dynamic overwrite) all use the MergingSnapshotProducer which
respects this property.
```suggestion
manifest_merge_enabled = PropertyUtil.property_as_bool(
self.table_metadata.properties,
TableProperties.MANIFEST_MERGE_ENABLED,
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
)
update_snapshot =
self.update_snapshot(snapshot_properties=snapshot_properties)
append_method = update_snapshot.merge_append if
manifest_merge_enabled else update_snapshot.fast_append
with append_method() as append_snapshot:
```
##########
pyiceberg/table/__init__.py:
##########
@@ -502,6 +503,71 @@ def append(self, df: pa.Table, snapshot_properties:
Dict[str, str] = EMPTY_DICT)
for data_file in data_files:
append_files.append_data_file(data_file)
+ def _build_partition_predicate(self, spec_id: int, delete_partitions:
List[Record]) -> BooleanExpression:
+ partition_spec = self.table_metadata.spec()
+ schema = self.table_metadata.schema()
+ partition_fields = [schema.find_field(field.source_id).name for field
in partition_spec.fields]
+
+ expr: BooleanExpression = AlwaysFalse()
+ for partition_record in delete_partitions:
+ match_partition_expression: BooleanExpression = AlwaysTrue()
+
+ for pos in range(len(partition_fields)):
+ predicate = (
+ EqualTo(Reference(partition_fields[pos]),
partition_record[pos])
+ if partition_record[pos] is not None
+ else IsNull(Reference(partition_fields[pos]))
+ )
+ match_partition_expression = And(match_partition_expression,
predicate)
+ expr = Or(expr, match_partition_expression)
+ return expr
+
+ def dynamic_overwrite(self, df: pa.Table, snapshot_properties: Dict[str,
str] = EMPTY_DICT) -> None:
+ """
+ Shorthand for adding a table dynamic overwrite with a PyArrow table to
the transaction.
+
+ Args:
+ df: The Arrow dataframe that will be used to overwrite the table
+ snapshot_properties: Custom properties to be added to the snapshot
summary
+ """
+ try:
+ import pyarrow as pa
+ except ModuleNotFoundError as e:
+ raise ModuleNotFoundError("For writes PyArrow needs to be
installed") from e
+
+ if not isinstance(df, pa.Table):
+ raise ValueError(f"Expected PyArrow table, got: {df}")
+
+ _check_schema_compatible(self._table.schema(), other_schema=df.schema)
+
+ # cast if the two schemas are compatible but not equal
+ table_arrow_schema = self._table.schema().as_arrow()
+ if table_arrow_schema != df.schema:
+ df = df.cast(table_arrow_schema)
Review Comment:
Just an FYI: there's a
[PR](https://github.com/apache/iceberg-python/pull/921/files) that changes this
block of code
##########
pyiceberg/table/__init__.py:
##########
@@ -502,6 +503,71 @@ def append(self, df: pa.Table, snapshot_properties:
Dict[str, str] = EMPTY_DICT)
for data_file in data_files:
append_files.append_data_file(data_file)
+ def _build_partition_predicate(self, spec_id: int, delete_partitions:
List[Record]) -> BooleanExpression:
Review Comment:
nit: I found the `delete_partitions` argument a bit confusing here, because
this function just translates a set of partition record values to its
corresponding predicate. Could we rename it to something more generic to
indicate that? W should also remove `spec_id` which isn't used in this function
```suggestion
def _build_partition_predicate(self, partition_records: List[Record]) ->
BooleanExpression:
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]