GitHub user adriangb closed a discussion: Performant streaming of Parquet to
Postgres
I couldn't find anything out there so I'm looking into writing something to
move data from parquet files (possibly stored in an object store or HTTP) into
Postgres as fast as possible. For my use case I have a couple of requirements:
- Able to work with larger than memory files without blowing up memory or
taking forever
- Can be hooked into from Python
My initial attempt was using Polars but unfortunately it is not able to read
Parquet files in batches efficiently and converting from Arrow -> Python types
(via Polars) and then Python types -> Postgres (via asyncpg) is slow.
So naturally I'm looking at doing this in Rust to speed things up.
Here's my plan, I want to see if it seems viable. I'll have a Rust library that
goes from `RecordBatch` to Postgres' binary format. It'll look something like
this:
```rust
pub struct PostgresBinaryCopyEncoder {}
impl PostgresBinaryCopyEncoder {
// Initialize with arrow schema for these RecordBatches
pub fn new(types: Schema) -> BinaryCopyInWriter {}
// If this is the first call inlcude headers in the returned data
// Gather all of the columns via RecordBatch.columns
// Zip the columns and iterate over them row-wise
// For each value use the column's type to downcast into a Rust type
// Use the ToSql trait which is already implemented for Rust values to
encode into pg's binary format
pub fn encode(&mut self, values: &RecordBatch) -> Result<&[u8], Error> {}
// Get the footers
pub fn finalize(&mut self) -> Result<&[u8, Error]> {}
```
And then a Python side that manages the IO so that it can decide to stream the
bytes into Postgres directly, write to a file somewhere, read the parquet file
from disk, in threads for an async environment, etc. That would look something
like this:
```python
from thislib,_rust_stuff import PosgresBinaryCopyEncoder as
PosgresBinaryCopyEncoderRust
class PosgresBinaryCopyEncoder:
def __init__(self, stream: RecordBatchStreamReader) -> None: ...
def stream(self) -> Iterable[bytes]:
schema = self._stream.schema
encoder = RecordBatchStreamReader(schema)
while True:
try:
batch = self._stream.read_next_batch()
yield encoder.encoder(batch)
except StopIteration:
yield encoder.finalize()
```
I've already written something to encode Rust types into Postgres types based
on
[rust-postgres-binary-copy](https://github.com/sfackler/rust-postgres-binary-copy).
Where I'm a bit confused is on how to iterate over a RecordBatch and get Rust
types out of it. Alternatively I guess we could iterate over a RecordBatch and
get arrow data out of it but then I'd need to re-implement the
`postgres_types::ToSql` trait for all arrow types. And I expect an extra hop
from Arrow -> Rust native type will be quite cheap.
GitHub link: https://github.com/apache/arrow-rs/discussions/3641
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]