This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new 7e66ccbf feat: Add tell() to OutputStream writers (#2998)
7e66ccbf is described below
commit 7e66ccbf16fc736df20f96c2ea030200fbe3a5e3
Author: geruh <[email protected]>
AuthorDate: Sun Feb 1 16:35:57 2026 -0800
feat: Add tell() to OutputStream writers (#2998)
# Rationale for this change
Currently, PyIceberg writes one manifest per snapshot operation
regardless of manifest size. In order to eventually support this we need
to be able to track written bytes without closing the file, so that we
can roll to a new file once we hit target size.
We had some of this work done in #650, but we can keep this simple and
add writers as a follow up. The nice thing is that the underlying
streams we support already have a tell() method and we just need to
expose it.
With this change in the follow up we can do:
```
with write_manifest(...) as writer:
writer.add_entry(entry)
if writer.tell() >= target_file_size:
# roll to new file
```
## Are these changes tested?
Yes, added a test :)
## Are there any user-facing changes?
No
---
pyiceberg/avro/file.py | 3 +++
pyiceberg/io/__init__.py | 3 +++
pyiceberg/manifest.py | 3 +++
tests/utils/test_manifest.py | 35 +++++++++++++++++++++++++++++++++++
4 files changed, 44 insertions(+)
diff --git a/pyiceberg/avro/file.py b/pyiceberg/avro/file.py
index 8877e8bf..7db92818 100644
--- a/pyiceberg/avro/file.py
+++ b/pyiceberg/avro/file.py
@@ -317,3 +317,6 @@ class AvroOutputFile(Generic[D]):
self.encoder.write(block_content)
self.encoder.write(self.sync_bytes)
+
+ def tell(self) -> int:
+ return self.output_stream.tell()
diff --git a/pyiceberg/io/__init__.py b/pyiceberg/io/__init__.py
index 5592bc99..7dbc6512 100644
--- a/pyiceberg/io/__init__.py
+++ b/pyiceberg/io/__init__.py
@@ -140,6 +140,9 @@ class OutputStream(Protocol): # pragma: no cover
@abstractmethod
def write(self, b: bytes) -> int: ...
+ @abstractmethod
+ def tell(self) -> int: ...
+
@abstractmethod
def close(self) -> None: ...
diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py
index 4c68f5e3..cca0af76 100644
--- a/pyiceberg/manifest.py
+++ b/pyiceberg/manifest.py
@@ -1059,6 +1059,9 @@ class ManifestWriter(ABC):
self.closed = True
self._writer.__exit__(exc_type, exc_value, traceback)
+ def tell(self) -> int:
+ return self._writer.tell()
+
@abstractmethod
def content(self) -> ManifestContent: ...
diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py
index 3862d6b6..3f859b3b 100644
--- a/tests/utils/test_manifest.py
+++ b/tests/utils/test_manifest.py
@@ -897,3 +897,38 @@ def
test_manifest_cache_efficiency_with_many_overlapping_lists() -> None:
if len(references) > 1:
for ref in references[1:]:
assert ref is references[0], f"All references to manifest
{i} should be the same object instance"
+
+
[email protected]("format_version", [1, 2])
+def test_manifest_writer_tell(format_version: TableVersion) -> None:
+ io = load_file_io()
+ test_schema = Schema(NestedField(1, "foo", IntegerType(), False))
+
+ with TemporaryDirectory() as tmpdir:
+ output_file = io.new_output(f"{tmpdir}/test-manifest.avro")
+ with write_manifest(
+ format_version=format_version,
+ spec=UNPARTITIONED_PARTITION_SPEC,
+ schema=test_schema,
+ output_file=output_file,
+ snapshot_id=1,
+ avro_compression="null",
+ ) as writer:
+ initial_bytes = writer.tell()
+ data_file = DataFile.from_args(
+ content=DataFileContent.DATA,
+ file_path=f"{tmpdir}/data.parquet",
+ file_format=FileFormat.PARQUET,
+ partition=Record(),
+ record_count=100,
+ file_size_in_bytes=1000,
+ )
+ entry = ManifestEntry.from_args(
+ status=ManifestEntryStatus.ADDED,
+ snapshot_id=1,
+ data_file=data_file,
+ )
+ writer.add_entry(entry)
+ after_entry_bytes = writer.tell()
+
+ assert after_entry_bytes > initial_bytes, "Bytes should increase
after adding entry"