jecsand838 opened a new pull request, #8100:
URL: https://github.com/apache/arrow-rs/pull/8100

   # Which issue does this PR close?
   
   - Part of https://github.com/apache/arrow-rs/issues/4886
   
   # Rationale for this change
   
   Decoding Avro **single-object encoded** streams was brittle when data 
arrived in partial chunks (e.g., from async or networked sources). The old 
implementation relied on ad‑hoc prefix handling and assumed a full record would 
be available, producing hard errors for otherwise normal “incomplete buffer” 
situations. Additionally, the Avro OCF (Object Container File) path iterated 
record‑by‑record through a shared row decoder, adding overhead.
   
   This PR introduces a small state machine for single‑object decoding and a 
block‑aware path for OCF, making streaming more robust and OCF decoding more 
efficient while preserving the public API surface.
   
   # What changes are included in this PR?
   
   **Single‑object decoding (streaming)**
   - Replace ad‑hoc prefix parsing (`expect_prefix`, `handle_prefix`, 
`handle_fingerprint`) with an explicit state machine:
     - New `enum DecoderState { Magic, Fingerprint, Record, SchemaChange, 
Finished }`.
     - `Decoder` now tracks `state`, `bytes_remaining`, and a `fingerprint_buf` 
to incrementally assemble the fingerprint.
   - New helper `is_incomplete_data(&ArrowError) -> bool` to treat “Unexpected 
EOF”, “bad varint”, and “offset overflow” as *incomplete input* instead of 
fatal errors.
   - Reworked `Decoder::decode(&[u8]) -> Result<usize, ArrowError>`:
     - Consumes data according to the state machine.
     - Cleanly returns when more bytes are needed (no spurious errors for 
partial chunks).
     - Defers schema switching until after flushing currently decoded rows.
   - Updated `Decoder::flush()` to emit a batch only when rows are ready and to 
transition the state correctly (including a staged `SchemaChange`).
   
   **OCF (Object Container File) decoding**
   - Add block‑aware decoding methods on `Decoder` used by `Reader`:
     - `decode_block(&[u8], count: usize) -> Result<(consumed, 
records_decoded), ArrowError>`
     - `flush_block() -> Result<Option<RecordBatch>, ArrowError>`
   - `Reader` now tracks `block_count` and decodes up to the number of records 
in the current block, reducing per‑row overhead and improving throughput.
   - `ReaderBuilder::build` initializes the new `block_count` path.
   
   **API / struct adjustments**
   - Remove internal `expect_prefix` flag from `Decoder`; behavior is driven by 
the state machine.
   - `ReaderBuilder::make_decoder_with_parts` updated accordingly (no behavior 
change to public builder methods).
   - No public API signature changes for `Reader`, `Decoder`, or 
`ReaderBuilder`.
   
   **Tests**
   - Add targeted streaming tests:
     - `test_two_messages_same_schema`
     - `test_two_messages_schema_switch`
     - `test_split_message_across_chunks`
   - Update prefix‑handling tests to validate state transitions (`Magic` → 
`Fingerprint`, etc.) and new error messages.
   - Retain and exercise existing suites (types, lists, nested structures, 
decimals, enums, strict mode) with minimal adjustments.
   
   # Are these changes tested?
   
   Yes.
   - New unit tests cover:
     - Multi‑message streams with/without schema switches
     - Messages split across chunk boundaries
     - Incremental prefix/fingerprint parsing
   - Existing tests continue to cover OCF reading, compression, complex/nested 
types, strict mode, etc.
   - The new OCF path is exercised by the unchanged OCF tests since `Reader` 
now uses `decode_block/flush_block`.
   
   # Are there any user-facing changes?
   
   N/A
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to