jroddev opened a new issue, #6581: URL: https://github.com/apache/arrow-rs/issues/6581
**Which part is this question about** Parquet, Arrow, Async Trying to adapt [external_metadata](https://github.com/apache/arrow-rs/blob/master/parquet/examples/external_metadata.rs) example with an custom AsyncFileReader **Describe your question** Not sure if this is a bug or a problem with my implementation. I use [this parquet file](https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2024-01.parquet) but have also tested with others and get the same error. The error that I am seeing is ```rust ArrowError("Parquet argument error: External: protocol error") ``` and after some digging I found the underlying error is ```rust ParquetError::External from thrift ProtocolError { kind: Unknown, message: "missing required field PageHeader.type_", } ``` This is happening because the first ident hit in [thrift read_from_in_protocol](https://github.com/apache/arrow-rs/blob/9485897ccb6da955a3efeba84e552e85d4efaa20/parquet/src/format.rs#L696) is of type `TType::Stop` so it returns and `type_` is never set. When the metadata is initially parsed there is `i32`, `list`, `i32`, `string`, `i32` before it hits the first `TType::Stop` so it seems like it isn't starting from the start? **_Do I need to reset it back to the beginning somehow?_** **Context** This is the CustomAsyncFileReader. The eventual plan is to have this reading from an API so that I can page through RowGroups without loading them all into memory at once. ```rust struct CustomAsyncFileReader { pub file_path: String, pub parquet_metadata: Arc<ParquetMetaData>, } impl AsyncFileReader for CustomAsyncFileReader { fn get_bytes(&mut self, range: std::ops::Range<usize>) -> BoxFuture<'_, Result<Bytes>> { async move { let length = range.end - range.start; let mut file = File::open(&self.file_path).await.unwrap(); let _ = file.seek(io::SeekFrom::Start(range.start as u64)); let mut buffer = vec![0; length]; match file.read_exact(&mut buffer).await { Ok(bytes) => Ok(Bytes::from(buffer)) Err(e) => Err(ParquetError::General(e.to_string())) } } .boxed() } fn get_metadata( &mut self, ) -> BoxFuture<'_, Result<Arc<parquet::file::metadata::ParquetMetaData>>> { async move { Ok(self.parquet_metadata.clone()) }.boxed() } } ``` This is the processing of the rows. The println is where the error is shown ```rust async fn process_parquet_stream( remote_reader: CustomAsyncFileReader, metadata: Arc<ParquetMetaData>, ) { let options = ArrowReaderOptions::new().with_page_index(true); let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.clone().into(), options).unwrap(); let reader = ParquetRecordBatchStreamBuilder::new_with_metadata(remote_reader, arrow_reader_metadata) .build() .unwrap(); reader .for_each(|row_group| async move { println!("process row group: {:#?}", row_group); sleep(Duration::from_secs(1)).await; }) .await; } ``` Finally this is how I'm reading the metadata in the main function. I had tried a few other variants but they all result in the same error. ```rust #[tokio::main(flavor = "current_thread")] async fn main() -> Result<()> { let current_dir = env::current_dir().unwrap(); let path = format!( "{}/data/green_tripdata_2024-01.parquet", current_dir.display() ); let mut file = File::open(&path).await.unwrap(); let file_size = file.metadata().await.unwrap().len(); let metadata = Arc::new( ParquetMetaDataReader::new() .with_page_indexes(true) .load_and_finish(&mut file, file_size as usize) .await .unwrap(), ); let remote_reader = CustomAsyncFileReader { file_path: path.to_string(), parquet_metadata: metadata.clone(), }; process_parquet_stream(remote_reader, metadata).await; Ok(()) } ``` Any help is greatly appreciated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
