zhuqi-lucas commented on PR #7401:
URL: https://github.com/apache/arrow-rs/pull/7401#issuecomment-2798744870
I found the deadlock happen in the following code for page cache branch:
1. When we call has_next:
```rust
while total_records_read < max_records && self.has_next()? {
}
```
2. And we will call read_new_page:
```rust
#[inline]
pub(crate) fn has_next(&mut self) -> Result<bool> {
if self.num_buffered_values == 0 || self.num_buffered_values ==
self.num_decoded_values {
// TODO: should we return false if read_new_page() = true and
// num_buffered_values = 0?
println!("num_buffered_values: {}, num_decoded_values: {}",
self.num_buffered_values, self.num_decoded_values);
if !self.read_new_page()? {
Ok(false)
} else {
Ok(self.num_buffered_values != 0)
}
} else {
Ok(true)
}
}
```
3. We will call read_new_page, and the loop will cause dead lock because the
Page::DictionaryPage contine:
```rust
/// Reads a new page and set up the decoders for levels, values or
dictionary.
/// Returns false if there's no page left.
fn read_new_page(&mut self) -> Result<bool> {
println!("GenericColumnReader read_new_page");
loop {
match self.page_reader.get_next_page()? {
// No more page to read
None => return Ok(false),
Some(current_page) => {
//println!("GenericColumnReader read_new_page
current_page: {:?}", current_page.page_type());
match current_page {
// 1. Dictionary page: configure dictionary for this
page.
Page::DictionaryPage {
buf,
num_values,
encoding,
is_sorted,
} => {
self.values_decoder
.set_dict(buf, num_values, encoding,
is_sorted)?;
continue;
}
}
}
}
```
4. The root cause is we will always get the cached dict page in the
following logic, this is the corner case for this benchmark with page cache
branch:
```rust
impl<R: ChunkReader> PageReader for CachedPageReader<R> {
fn get_next_page(&mut self) -> Result<Option<Page>, ParquetError> {
//println!("CachedPageReader get next page");
let next_page_offset = self.inner.peek_next_page_offset()?;
//println!("CachedPageReader next page offset: {:?}",
next_page_offset);
let Some(offset) = next_page_offset else {
return Ok(None);
};
let mut cache = self.cache.get();
let page = cache.get_page(self.col_id, offset);
if let Some(page) = page {
self.inner.skip_next_page()?;
//println!("CachedPageReader skip next page");
Ok(Some(page))
} else {
//println!("CachedPageReader insert page");
let inner_page = self.inner.get_next_page()?;
let Some(inner_page) = inner_page else {
return Ok(None);
};
cache.insert_page(self.col_id, offset, inner_page.clone());
Ok(Some(inner_page))
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]