jroddev opened a new issue, #6581:
URL: https://github.com/apache/arrow-rs/issues/6581

   **Which part is this question about**
   Parquet, Arrow, Async
   Trying to adapt 
[external_metadata](https://github.com/apache/arrow-rs/blob/master/parquet/examples/external_metadata.rs)
 example with an custom AsyncFileReader
   
   **Describe your question**
   Not sure if this is a bug or a problem with my implementation.
   I use [this parquet 
file](https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2024-01.parquet)
 but have also tested with others and get the same error.
   
   The error that I am seeing is
   ```rust
   ArrowError("Parquet argument error: External: protocol error")
   ```
   and after some digging I found the underlying error is
   ```rust
   ParquetError::External from thrift ProtocolError {
       kind: Unknown,
       message: "missing required field PageHeader.type_",
   }
   ```
   This is happening because the first ident hit in [thrift 
read_from_in_protocol](https://github.com/apache/arrow-rs/blob/9485897ccb6da955a3efeba84e552e85d4efaa20/parquet/src/format.rs#L696)
 is of type `TType::Stop` so it returns and `type_` is never set. When the 
metadata is initially parsed there is `i32`, `list`, `i32`, `string`, `i32` 
before it hits the first `TType::Stop` so it seems like it isn't starting from 
the start? **_Do I need to reset it back to the beginning somehow?_** 
   
   **Context**
   
   This is the CustomAsyncFileReader. The eventual plan is to have this reading 
from an API so that I can page through RowGroups without loading them all into 
memory at once.
   ```rust
   struct CustomAsyncFileReader {
       pub file_path: String,
       pub parquet_metadata: Arc<ParquetMetaData>,
   }
   
   impl AsyncFileReader for CustomAsyncFileReader {
       fn get_bytes(&mut self, range: std::ops::Range<usize>) -> BoxFuture<'_, 
Result<Bytes>> {
           async move {
               let length = range.end - range.start;
               let mut file = File::open(&self.file_path).await.unwrap();
               let _ = file.seek(io::SeekFrom::Start(range.start as u64));
               let mut buffer = vec![0; length];
               match file.read_exact(&mut buffer).await {
                   Ok(bytes) => Ok(Bytes::from(buffer))
                   Err(e) => Err(ParquetError::General(e.to_string()))
               }
           }
           .boxed()
       }
   
       fn get_metadata(
           &mut self,
       ) -> BoxFuture<'_, 
Result<Arc<parquet::file::metadata::ParquetMetaData>>> {
           async move { Ok(self.parquet_metadata.clone()) }.boxed()
       }
   }
   ```
   
   This is the processing of the rows. The println is where the error is shown
   ```rust
   async fn process_parquet_stream(
       remote_reader: CustomAsyncFileReader,
       metadata: Arc<ParquetMetaData>,
   ) {
       let options = ArrowReaderOptions::new().with_page_index(true);
       let arrow_reader_metadata =
           ArrowReaderMetadata::try_new(metadata.clone().into(), 
options).unwrap();
       let reader =
           ParquetRecordBatchStreamBuilder::new_with_metadata(remote_reader, 
arrow_reader_metadata)
               .build()
               .unwrap();
       reader
           .for_each(|row_group| async move {
               println!("process row group: {:#?}", row_group);
               sleep(Duration::from_secs(1)).await;
           })
           .await;
   }
   ```
   
   Finally this is how I'm reading the metadata in the main function. I had 
tried a few other variants but they all result in the same error.
   ```rust
   #[tokio::main(flavor = "current_thread")]
   async fn main() -> Result<()> {
       let current_dir = env::current_dir().unwrap();
       let path = format!(
           "{}/data/green_tripdata_2024-01.parquet",
           current_dir.display()
       );
       let mut file = File::open(&path).await.unwrap();
       let file_size = file.metadata().await.unwrap().len();
   
       let metadata = Arc::new(
           ParquetMetaDataReader::new()
               .with_page_indexes(true)
               .load_and_finish(&mut file, file_size as usize)
               .await
               .unwrap(),
       );
   
       let remote_reader = CustomAsyncFileReader {
           file_path: path.to_string(),
           parquet_metadata: metadata.clone(),
       };
       process_parquet_stream(remote_reader, metadata).await;
       Ok(())
   }
   ```
   
   Any help is greatly appreciated.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to