GitHub user adriangb closed a discussion: Performant streaming of Parquet to 
Postgres

I couldn't find anything out there so I'm looking into writing something to 
move data from parquet files (possibly stored in an object store or HTTP) into 
Postgres as fast as possible. For my use case I have a couple of requirements:
- Able to work with larger than memory files without blowing up memory or 
taking forever
- Can be hooked into from Python

My initial attempt was using Polars but unfortunately it is not able to read 
Parquet files in batches efficiently and converting from Arrow -> Python types 
(via Polars) and then Python types -> Postgres (via asyncpg) is slow.

So naturally I'm looking at doing this in Rust to speed things up.

Here's my plan, I want to see if it seems viable. I'll have a Rust library that 
goes from `RecordBatch` to Postgres' binary format. It'll look something like 
this:

```rust
pub struct PostgresBinaryCopyEncoder {}

impl PostgresBinaryCopyEncoder {
    // Initialize with arrow schema for these RecordBatches
    pub fn new(types: Schema) -> BinaryCopyInWriter {}

    // If this is the first call inlcude headers in the returned data
    // Gather all of the columns via RecordBatch.columns
    // Zip the columns and iterate over them row-wise
    // For each value use the column's type to downcast into a Rust type
    // Use the ToSql trait which is already implemented for Rust values to 
encode into pg's binary format
    pub fn encode(&mut self, values: &RecordBatch) -> Result<&[u8], Error> {}

   // Get the footers
   pub fn finalize(&mut self) -> Result<&[u8, Error]> {}
```

And then a Python side that manages the IO so that it can decide to stream the 
bytes into Postgres directly, write to a file somewhere, read the parquet file 
from disk, in threads for an async environment, etc. That would look something 
like this:

```python
from thislib,_rust_stuff import PosgresBinaryCopyEncoder as 
PosgresBinaryCopyEncoderRust

class PosgresBinaryCopyEncoder:
    def __init__(self, stream: RecordBatchStreamReader) -> None:  ...

   def stream(self) -> Iterable[bytes]:
      schema = self._stream.schema
      encoder = RecordBatchStreamReader(schema)
      while True:
          try:
              batch = self._stream.read_next_batch()
              yield encoder.encoder(batch)
          except StopIteration:
              yield encoder.finalize()       
```

I've already written something to encode Rust types into Postgres types based 
on 
[rust-postgres-binary-copy](https://github.com/sfackler/rust-postgres-binary-copy).
 Where I'm a bit confused is on how to iterate over a RecordBatch and get Rust 
types out of it. Alternatively I guess we could iterate over a RecordBatch and 
get arrow data out of it but then I'd need to re-implement the 
`postgres_types::ToSql` trait for all arrow types. And I expect an extra hop 
from Arrow -> Rust native type will be quite cheap.

GitHub link: https://github.com/apache/arrow-rs/discussions/3641

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to