qzyu999 commented on code in PR #3320:
URL: https://github.com/apache/iceberg-python/pull/3320#discussion_r3269503332
##########
pyiceberg/table/update/snapshot.py:
##########
@@ -351,11 +359,39 @@ def new_manifest_output(self) -> OutputFile:
location_provider = self._transaction._table.location_provider()
file_name =
_new_manifest_file_name(num=next(self._manifest_num_counter),
commit_uuid=self.commit_uuid)
file_path = location_provider.new_metadata_location(file_name)
+ self._written_manifests.append(file_path)
return self._io.new_output(file_path)
def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted:
bool = True) -> list[ManifestEntry]:
return manifest.fetch_manifest_entry(io=self._io,
discard_deleted=discard_deleted)
+ def commit(self) -> None:
+ self._transaction._register_snapshot_producer(self)
+ self._transaction._apply(*self._commit())
+
+ def _cleanup_uncommitted(self) -> None:
+ """Delete manifest files from failed retry attempts."""
+ for path in self._uncommitted_manifests:
+ try:
+ self._io.delete(path)
+ except Exception:
+ logger.warning("Failed to delete uncommitted manifest: %s",
path, exc_info=True)
+ self._uncommitted_manifests.clear()
Review Comment:
suggestion: We could also add a second similar function as follows:
```python
def _clean_all_uncommitted(self) -> None:
"""Clean up all manifests written during this producer's lifecycle
on abort."""
for path in itertools.chain(self._uncommitted_manifests,
self._written_manifests):
try:
self._io.delete(path)
except Exception:
logger.warning("Failed to delete uncommitted manifest: %s",
path, exc_info=True)
self._uncommitted_manifests.clear()
self._written_manifests.clear()
```
then in `Transaciton.commit_transaction()`, we can add a try/except to the
for-loop as follows:
```python
try:
for attempt in range(num_retries + 1):
try:
self._table._do_commit(...)
self._cleanup_uncommitted_manifests()
break
except CommitFailedException:
... # retry logic
except Exception:
# Catch ValidationException or retry exhaustion
for producer in self._snapshot_producers:
producer._clean_all_uncommitted()
raise
```
this would then allow the PyIceberg implementation to mirror the
`cleanAll()` method in Java. In the current implementation, the for-loop for
retrying will only clear out the `_uncommitted_manifests` from the previous
failed retries, but we can extend this with `_clean_all_uncommitted` which will
clear out that and `_written_manifests` from the current attempt in the case
of a permanent abort. This would fix the gap for orphaned manifests from
`ValidationException` (or other permanent failures) that are not cleaned up. I
also think it's worth mentioning that this fix could be cleanly added to this
PR without waiting for a full Delete orphaned files implementation in
PyIceberg. WDYT about adding this into the current PR?
##########
pyiceberg/table/__init__.py:
##########
@@ -939,17 +975,73 @@ def commit_transaction(self) -> Table:
The table with the updates applied.
"""
if len(self._updates) > 0:
- self._requirements +=
(AssertTableUUID(uuid=self.table_metadata.table_uuid),)
- self._table._do_commit( # pylint: disable=W0212
- updates=self._updates,
- requirements=self._requirements,
+ from pyiceberg.utils.properties import property_as_int
+
+ properties = self._table.metadata.properties
+ num_retries_val = property_as_int(
+ properties, TableProperties.COMMIT_NUM_RETRIES,
TableProperties.COMMIT_NUM_RETRIES_DEFAULT
+ )
+ num_retries = num_retries_val if num_retries_val is not None else
TableProperties.COMMIT_NUM_RETRIES_DEFAULT
+ min_wait_val = property_as_int(
+ properties, TableProperties.COMMIT_MIN_RETRY_WAIT_MS,
TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT
+ )
+ min_wait_ms = min_wait_val if min_wait_val is not None else
TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT
+ max_wait_val = property_as_int(
+ properties, TableProperties.COMMIT_MAX_RETRY_WAIT_MS,
TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
)
+ max_wait_ms = max_wait_val if max_wait_val is not None else
TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT
+ total_timeout_val = property_as_int(
+ properties, TableProperties.COMMIT_TOTAL_RETRY_TIME_MS,
TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT
+ )
+ total_timeout_ms = (
+ total_timeout_val if total_timeout_val is not None else
TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT
+ )
+ start_time = time.monotonic()
+
+ for attempt in range(num_retries + 1):
+ try:
+ self._requirements +=
(AssertTableUUID(uuid=self.table_metadata.table_uuid),)
Review Comment:
suggestion: Here `AssertTableUUID` is appended to `self._requirements`
within each retry loop, but below in `_rebuild_snapshot_updates` it's removed
again with:
> `self._requirements = tuple(r for r in self._requirements if not
isinstance(r, (AssertRefSnapshotId, AssertTableUUID)))`
This can be simplified by moving `self._requirements +=
(AssertTableUUID(uuid=self.table_metadata.table_uuid),)` outside the for-loop
and updating the line in `_rebuild_snapshot_updates` to simply:
> `self._requirements = tuple(r for r in self._requirements if not
isinstance(r, AssertRefSnapshotId))`
The reason being is that `AssertTableUUID` would remain constant the whole
time, so we're simply adding and removing it within each retry.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]