paultmathew opened a new pull request, #3336:
URL: https://github.com/apache/iceberg-python/pull/3336

   # Rationale for this change
   
   Builds on #3335. Please land that one first.
   
   PR #3335 added `pa.RecordBatchReader` as a valid input to 
`Table.append`/`Table.overwrite` using a buffered bin-pack approach. That 
implementation has two acknowledged caveats called out in its docstrings:
   
   1. **Memory bound**: peak memory is `N_workers × 
write.target-file-size-bytes` (~4 GiB at defaults) — better than "materialise 
everything" but not constant.
   2. **Byte semantics**: `write.target-file-size-bytes` is interpreted as 
uncompressed in-memory Arrow bytes via the `bin_pack_record_batches` helper, 
not on-disk compressed Parquet bytes. Resulting files are typically 3-10× 
smaller than the property suggests.
   
   This PR replaces the buffered approach with a rolling `pq.ParquetWriter` 
driven by `OutputStream.tell()` (added in #2998 specifically for this purpose). 
Both caveats go away:
   
   ```python
   # pyiceberg/io/pyarrow.py::_record_batches_to_data_files (excerpt)
   with output_file.create(overwrite=True) as fos:
       with pq.ParquetWriter(fos, schema=transformed_first.schema, ...) as 
writer:
           writer.write_batch(transformed_first, row_group_size=row_group_size)
           while fos.tell() < target_file_size:        # ← compressed on-disk 
bytes
               try:
                   batch = next(batches)
               except StopIteration:
                   break
               writer.write_batch(_transform(batch), 
row_group_size=row_group_size)
   ```
   
   What this delivers:
   
   - **Spec-correct file sizes**: `tell()` reports compressed on-disk bytes 
pyarrow has emitted to the stream, so `write.target-file-size-bytes` finally 
means what the spec says it means — matching the Java/Spark/Flink writers.
   - **Bounded memory independent of `target_file_size`**: peak RSS is bounded 
by one input batch + the Parquet page buffer (~1 MiB × columns) + the S3 
multipart upload pool (~5 MiB × ~8 in-flight parts). On a real S3 stack that's 
tens to a few hundred MiB, regardless of file size, dataset size, or number of 
files produced. See benchmark below.
   - **Each input `RecordBatch` becomes one Parquet row group**, with 
`write.parquet.row-group-limit` enforced as a per-row-group cap — identical 
treatment to the materialised `pa.Table` write path.
   - **No public API change.** Same `tx.append(reader)` / 
`tx.overwrite(reader)`. Internals only.
   - **`bin_pack_record_batches` is removed** (no longer needed). Its 4 unit 
tests are removed; the streaming behaviour is covered end-to-end by tests below.
   
   # Memory profile
   
   Streamed 1,000 batches × 5,000 rows × 108 bytes per row (≈ **515 MiB 
uncompressed**, **390 MiB on disk** after zstd of random alphanumeric payload, 
**24 files** written at `write.target-file-size-bytes = 16 MiB`) against AWS 
Glue + S3 (Aircall staging). Process RSS sampled at 19 Hz from a background 
thread. Detailed analysis below.
   
   | Metric | Value |
   |---|---|
   | Workload (uncompressed → on-disk) | 515 MiB → 390 MiB (1.3× zstd) |
   | Files written / row groups | 24 / 5,024 |
   | Wall time | 301 s (rate-limited by Python random-payload generation, not 
the writer) |
   | **Baseline RSS** | 178 MiB |
   | **Peak RSS** | **236 MiB** (delta +58 MiB; reached at t ≈ 15 s during the 
first file) |
   | Steady-state mean RSS | 167 MiB (≈ 10 MiB below baseline once Python GC 
reclaims import overhead) |
   | Steady-state p95 RSS | 229 MiB |
   | Steady-state σ | 18 MiB |
   
   The key observation: after the initial ramp during the first file, **RSS 
oscillates within a ~30 MiB band across all 24 file rollovers and shows no 
growth from start to finish**. Memory is bounded by the in-flight `RecordBatch` 
+ Parquet page buffer + multipart upload pool — independent of 
`target_file_size`, dataset size, or number of files produced. The repository's 
previous buffered approach (#3335) held up to `target_file_size × N_workers` of 
uncompressed Arrow buffers (≈ 4 GiB at defaults) — **roughly 70× higher peak 
memory at default property settings**.
   
   <img width="1200" height="600" alt="smoke_memory_PR2" 
src="https://github.com/user-attachments/assets/4526d52b-3035-4402-b37b-2807c4d52023";
 />
   
   ```
   
   # Throughput / parallelism
   
   Streaming writes are sequential — one rolling file at a time. Single-stream 
throughput is bounded by the underlying multipart upload pool (~8 concurrent S3 
PUTs in pyarrow.fs.S3FileSystem), which saturates typical network links and is 
rarely the bottleneck for streaming pipelines (where the upstream source — DB 
cursor, API, queue — is the limit). Callers wanting maximum write throughput 
(backfills, dataset migrations) can materialise as `pa.Table` and use 
`tx.append(pa.Table)`, which keeps the existing `executor.map`-based file-level 
parallelism for the materialised path **completely unchanged** by this PR.
   
   A hybrid worker pool for the streaming path (N concurrent rolling writers 
fed by a queue) is a possible follow-up if real workloads show streaming write 
throughput as a bottleneck. Mirrors iceberg-go's roadmap, which has shipped 
single-writer-only streaming since April 2025 (iceberg-go#369) without 
follow-up demand.
   
   # Properties honored
   
   The streaming path honors the same parquet writer properties as the 
materialised `pa.Table` path:
   
   | Property | In Iceberg spec? | Honored by this PR? |
   |---|---|---|
   | `write.target-file-size-bytes` | ✅ | ✅ via `OutputStream.tell()`, on-disk 
compressed bytes |
   | `write.parquet.compression-codec` / `compression-level` | ✅ | ✅ |
   | `write.parquet.page-size-bytes` | ✅ | ✅ |
   | `write.parquet.page-row-limit` | ✅ | ✅ |
   | `write.parquet.dict-size-bytes` | ✅ | ✅ |
   | `write.parquet.row-group-limit` (pyiceberg-internal) | ❌ (not in spec) | ✅ 
as a per-row-group cap, identical treatment to materialised path |
   | `write.parquet.row-group-size-bytes` | ✅ | ❌ pre-existing pyiceberg-wide 
gap, warned by `_get_parquet_writer_kwargs` for both paths; out of scope here |
   | `write.parquet.bloom-filter-*` | ✅ | ❌ pre-existing pyiceberg-wide gap |
   
   When someone fixes `write.parquet.row-group-size-bytes` for pyiceberg, both 
write paths benefit. PR2 deliberately doesn't touch this since the gap predates 
this PR series.
   
   # Code duplication note
   
   `_record_batches_to_data_files` shares some boilerplate with the 
materialised `write_file`'s nested `write_parquet` closure: 
parquet-writer-kwargs / row-group-size / location-provider extraction, 
`file_schema` selection, and `DataFile` construction from Parquet metadata. The 
shared module-level helpers (`_get_parquet_writer_kwargs`, 
`_to_requested_schema`, `data_file_statistics_from_parquet_metadata`, etc.) are 
reused, but the "compose these helpers in the standard pattern" wrapper lives 
independently in each path.
   
   Extraction is mechanical (~100 lines of pure refactor) but I'd prefer to 
land it as a standalone follow-up PR — it touches the existing `write_file` 
closure which I'd rather not modify in the same PR as the new streaming 
implementation.
   
   # Are these changes tested?
   
   Yes, at four layers.
   
   ## 1. End-to-end behaviour tests (no Docker)
   
   `tests/catalog/test_catalog_behaviors.py` — 10 tests parametrised across all 
three in-process catalog backends (`memory`, `sql`, `sql_without_rowcount`) → 
**30 test runs**:
   
   - `test_append_record_batch_reader` — basic append round-trip.
   - `test_append_record_batch_reader_microbatched` — multi-file rollover via 
`target-file-size-bytes=1`.
   - `test_append_record_batch_reader_row_group_limit_is_cap` — feeds a single 
1,000-row batch, sets `row-group-limit=250`, asserts the resulting Parquet has 
exactly 4 row groups of 250 rows each (verified by reading the Parquet footer 
with `pq.read_metadata`).
   - `test_append_record_batch_reader_target_file_size_is_on_disk_bytes` — sets 
`target-file-size-bytes=32 KiB`, streams ~12 MiB, asserts each rolled file is 
between 0.5× and 5× the target. Catches regression to the old 
uncompressed-Arrow-bytes behaviour (which would produce files ~3-10× *smaller* 
than target).
   - `test_append_record_batch_reader_empty` — empty reader produces zero data 
files.
   - `test_overwrite_record_batch_reader` — overwrite via reader replaces 
existing rows.
   - `test_append_record_batch_reader_to_partitioned_table_raises` — 
partitioned-table input raises `NotImplementedError`.
   - `test_append_invalid_input_type_raises` — non-Arrow input rejected.
   - `test_record_batch_reader_consumed_exactly_once` — reader generator 
drained once; no double-pass regression.
   - `test_record_batch_reader_schema_mismatch_writes_no_files` — schema 
mismatch fails before any data files are written (no orphan files).
   
   ## 2. Spark integration tests
   
   `tests/integration/test_writes/test_writes.py` — 6 tests (× v1, v2 format 
versions) proving Spark can read tables written via the streaming path against 
the docker-compose stack:
   
   - `test_append_record_batch_reader[1, 2]`
   - `test_overwrite_record_batch_reader[1, 2]`
   - `test_append_record_batch_reader_multifile[1, 2]`
   
   ## 3. Local CI sweep
   
   - `make test` (full unit suite): **3,650 passed**, 0 failed, lint + mypy + 
pydocstyle clean.
   - `make test-integration` (full Spark integration suite on fresh 
docker-compose): **396 passed, 1 skipped, 0 failed** in 3:47.
   
   ## 4. Real-stack smoke test on AWS
   
   Verified end-to-end against AWS Glue + S3 in our staging account:
   
   - 5M-row × 108 byte streaming append (515 MiB uncompressed → 390 MiB on disk 
in 24 files), 19 Hz RSS sampling — peak 236 MiB, mean 167 MiB, no growth from 
start to finish (chart above).
   - The same scripts that backed PR #3335 
(`smoke_test_record_batch_reader.py`, `smoke_test_athena_readback.py`) pass 
unchanged on this branch — Athena `COUNT(*)` and `MAX(id)` match input on a 
streaming-written table.
   
   # Are there any user-facing changes?
   
   Effectively none beyond what #3335 already introduced — this PR changes 
internals only:
   
   - **`write.target-file-size-bytes` semantics tighten for streaming inputs.** 
A user who set this property on a streaming-write workflow under #3335 was 
getting files 3-10× smaller than configured (uncompressed Arrow bytes proxy). 
With this PR the property now reflects actual on-disk compressed bytes — files 
become correspondingly larger. This is a net win and matches the spec, but 
worth noting for anyone who calibrated batch sizes around the old behaviour.
   - **`bin_pack_record_batches` helper removed** from `pyiceberg.io.pyarrow`. 
It was added in #3335 (so it's never been in a release) and its only consumer 
was `_dataframe_to_data_files`'s streaming branch, which is now restructured.
   
   The public `Table.append(reader)` / `Table.overwrite(reader)` API and its 
docstring guarantees are unchanged.
   
   # Related
   
   - Builds on #3335 (must land first)
   - Uses `OutputStream.tell()` from #2998 (already merged)
   - Closes the byte-semantics caveat documented in #3335
   - Reference implementation: iceberg-go#369 (single-writer streaming, same 
model)
   - Aligned with Java/Spark/Flink interpretation of 
`write.target-file-size-bytes`
   - Tracks https://github.com/apache/iceberg-python/issues/2152


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to