zeroshade opened a new pull request, #3870:
URL: https://github.com/apache/arrow-adbc/pull/3870

   ## Fix Critical Deadlocks and Race Conditions in Snowflake Record Reader
   
   This PR addresses multiple critical concurrency issues in the Snowflake 
driver's `recordReader` that could cause complete application hangs under 
normal racing conditions.
   
   ### Issues Fixed
   
   *1. Critical Deadlock: `Release()` Blocking Forever*
   
   *Problem*: When `Release()` was called while producer goroutines were 
blocked on channel sends, a permanent deadlock occurred:
   
   * `Release()` cancels context and attempts to drain channels
   * Producer goroutines blocked on `ch <- rec` cannot see the cancellation
   * Channels never close because producers never exit
   * `Release()` blocks forever on `for rec := range ch`
   
   *Fix:* Added a `done` channel that signals when all producer goroutines have 
completed. `Release()` now waits for this signal before attempting to drain 
channels.
   
   *2. Severe Deadlock: Non-Context-Aware Channel Sends*
   
   *Problem:* Channel send operations at lines 694 and 732 checked context 
before the send but not during:
   
   ```go
   for rr.Next() && ctx.Err() == nil {  // Context checked here
       // ... 
       ch <- rec  // But send blocks here without checking context
   }
   ```
   
   *Fix:* Wrapped all channel sends in `select` statements with context 
awareness:
   
   ```go
   select {
   case chs[0] <- rec:
       // Successfully sent
   case <-ctx.Done():
       rec.Release()
       return ctx.Err()
   }
   ```
   
   *3. Critical Race Condition: Nil Channel Reads*
   
   *Problem:* Channels were created asynchronously in goroutines after 
`newRecordReader` returned. If `Next()` was called quickly after creation, it 
could read from uninitialized (nil) channels, causing infinite blocking.
   
   *Fix:* Initialize all channels upfront before starting any goroutines:
   
   ```go
   chs := make([]chan arrow.RecordBatch, len(batches))
   for i := range chs {
       chs[i] = make(chan arrow.RecordBatch, bufferSize)
   }
   ```
   
   *4. Goroutine Leaks on Initialization Errors*
   
   *Problem:* Error paths only cleaned up the first channel, potentially 
leaking goroutines if initialization failed after starting concurrent 
operations.
   
   *Fix:* Moved all error-prone initialization (GetStream, NewReader) before 
goroutine creation, and added proper cleanup on errors.
   
   ----------------------
   
   #### Changes
   
   * Added `done` channel to `reader` struct to signal goroutine completion
   * Initialize all channels upfront to eliminate race conditions
   * Use context-aware sends with `select` statements for all channel operations
   * Update `Release()` to wait on `done` channel before draining
   * Reorganize initialization to handle errors before starting goroutines
   * Signal completion by closing `done` channel after all producers finish
   
   #### Reproduction Scenarios Prevented
   
   *Deadlock #1:*
   
   1. bufferSize = 1, producer generates 2 records quickly
   2. Channel becomes full after first record
   3. Producer blocks on send
   4. Consumer calls Release() before Next()
   5. Without fix: permanent deadlock
   6. With fix: producer responds to cancellation, Release() completes
   
   *Race Condition:*
   
   1. Query returns 3 batches
   2. First batch processes quickly
   3. Next() advances to second channel
   4. Without fix: reads from nil channel, blocks forever
   5. With fix: channel already initialized, works correctly
   
   potentially fixes #3730


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to