jecsand838 opened a new pull request, #8100: URL: https://github.com/apache/arrow-rs/pull/8100
# Which issue does this PR close? - Part of https://github.com/apache/arrow-rs/issues/4886 # Rationale for this change Decoding Avro **single-object encoded** streams was brittle when data arrived in partial chunks (e.g., from async or networked sources). The old implementation relied on ad‑hoc prefix handling and assumed a full record would be available, producing hard errors for otherwise normal “incomplete buffer” situations. Additionally, the Avro OCF (Object Container File) path iterated record‑by‑record through a shared row decoder, adding overhead. This PR introduces a small state machine for single‑object decoding and a block‑aware path for OCF, making streaming more robust and OCF decoding more efficient while preserving the public API surface. # What changes are included in this PR? **Single‑object decoding (streaming)** - Replace ad‑hoc prefix parsing (`expect_prefix`, `handle_prefix`, `handle_fingerprint`) with an explicit state machine: - New `enum DecoderState { Magic, Fingerprint, Record, SchemaChange, Finished }`. - `Decoder` now tracks `state`, `bytes_remaining`, and a `fingerprint_buf` to incrementally assemble the fingerprint. - New helper `is_incomplete_data(&ArrowError) -> bool` to treat “Unexpected EOF”, “bad varint”, and “offset overflow” as *incomplete input* instead of fatal errors. - Reworked `Decoder::decode(&[u8]) -> Result<usize, ArrowError>`: - Consumes data according to the state machine. - Cleanly returns when more bytes are needed (no spurious errors for partial chunks). - Defers schema switching until after flushing currently decoded rows. - Updated `Decoder::flush()` to emit a batch only when rows are ready and to transition the state correctly (including a staged `SchemaChange`). **OCF (Object Container File) decoding** - Add block‑aware decoding methods on `Decoder` used by `Reader`: - `decode_block(&[u8], count: usize) -> Result<(consumed, records_decoded), ArrowError>` - `flush_block() -> Result<Option<RecordBatch>, ArrowError>` - `Reader` now tracks `block_count` and decodes up to the number of records in the current block, reducing per‑row overhead and improving throughput. - `ReaderBuilder::build` initializes the new `block_count` path. **API / struct adjustments** - Remove internal `expect_prefix` flag from `Decoder`; behavior is driven by the state machine. - `ReaderBuilder::make_decoder_with_parts` updated accordingly (no behavior change to public builder methods). - No public API signature changes for `Reader`, `Decoder`, or `ReaderBuilder`. **Tests** - Add targeted streaming tests: - `test_two_messages_same_schema` - `test_two_messages_schema_switch` - `test_split_message_across_chunks` - Update prefix‑handling tests to validate state transitions (`Magic` → `Fingerprint`, etc.) and new error messages. - Retain and exercise existing suites (types, lists, nested structures, decimals, enums, strict mode) with minimal adjustments. # Are these changes tested? Yes. - New unit tests cover: - Multi‑message streams with/without schema switches - Messages split across chunk boundaries - Incremental prefix/fingerprint parsing - Existing tests continue to cover OCF reading, compression, complex/nested types, strict mode, etc. - The new OCF path is exercised by the unchanged OCF tests since `Reader` now uses `decode_block/flush_block`. # Are there any user-facing changes? N/A -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
