DanteOz opened a new issue, #1310:
URL: https://github.com/apache/arrow-adbc/issues/1310
I'm running into errors when trying to bulk load data into postgres using
`adbc_driver_postgres` with a `pyarrow.dataset`. The dataset is composed of
partitioned csv files. I have verified that using `pyarrow.csv.open_csv()`
works with `adbc_ingest()`. Additionally it appears loading the dataset into a
`pyarrow.Table`, passing it to polars, and calling `df.to_arrow()` works with
`adbc_ingest()`.
The following is a script to reproduce the errors I have been getting:
## Python version
3.11.6
## Dependancies
```
polars==0.19.14
pyarrow==14.0.1
adbc_driver_postgresql==0.8.0
```
## MRE
```python
from pathlib import Path
from random import randint
import polars as pl
import pyarrow as pa
import pyarrow.csv as csv
import pyarrow.dataset as ds
from adbc_driver_postgresql import dbapi
# ENTER POSTGRES CONNECTION URI
CONNECTION_URI = ""
CHUNK_SIZE = 1e9
def gen_mock_data(base_path, n_rows=100, n_cols=10, n_years=10):
for y in range(n_years):
partion_df = pl.DataFrame(
[{f"col_{i}": randint(0, 100) for i in range(n_cols)} for j in
range(n_rows)]
)
partion_df.write_csv(str(base_path / f"{2000+y}_data.csv"))
def ingest_data(name, data, mode):
with dbapi.connect(uri=CONNECTION_URI) as conn:
with conn.cursor() as cursor:
cursor.adbc_ingest(
db_schema_name="public",
table_name=name,
data=data,
mode=mode,
)
conn.commit()
def test_csv_stream_reader(base_path):
"""Test adbc_ingest() with csv files using pyarrow.csv.open_csv()."""
for path in base_path.iterdir():
reader = csv.open_csv(
str(path),
parse_options=csv.ParseOptions(delimiter="|"),
read_options=csv.ReadOptions(block_size=CHUNK_SIZE),
)
ingest_data(name="test_csv_stream_reader", data=reader,
mode="create_append")
def test_csv_table(base_path):
"""Test adbc_ingest() with csv files using pyarrow.csv.open_csv(), read
into a pyarrow Table."""
for path in base_path.iterdir():
reader = csv.open_csv(
str(path),
parse_options=csv.ParseOptions(delimiter="|"),
read_options=csv.ReadOptions(block_size=CHUNK_SIZE),
)
ingest_data(name="test_csv_table", data=reader.read_all(),
mode="create_append")
def test_csv_dataset_table(base_path):
"""Test adbc_ingest() with csv files using pyarrow.dataset.dataset(),
read into a pyarrow Table."""
dst = ds.dataset(
base_path,
format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False,
block_size=CHUNK_SIZE)),
partitioning=ds.FilenamePartitioning(
pa.schema([("year", pa.int64())]),
),
)
table = dst.to_table()
ingest_data(name="test_csv_dataset_table", data=table, mode="replace")
def test_csv_dataset_batch(base_path):
"""Test adbc_ingest() with csv files using pyarrow.dataset.dataset(),
read into a pyarrow RecordBatch."""
dst = ds.dataset(
base_path,
format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False,
block_size=CHUNK_SIZE)),
partitioning=ds.FilenamePartitioning(
pa.schema([("year", pa.int64())]),
),
)
record_batch = dst.to_batches()
ingest_data(name="test_csv_dataset_batch", data=record_batch,
mode="replace")
def test_csv_dataset_reader(base_path):
"""Test adbc_ingest() with csv files using pyarrow.dataset.dataset(),
read into a pyarrow RecordBatchReader."""
dst = ds.dataset(
base_path,
format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False,
block_size=CHUNK_SIZE)),
partitioning=ds.FilenamePartitioning(
pa.schema([("year", pa.int64())]),
),
)
scanner = dst.scanner()
ingest_data(name="test_csv_dataset_reader", data=scanner.to_reader(),
mode="replace")
def test_csv_dataset_polars_table(base_path):
"""Test adbc_ingest() with csv files using pyarrow.dataset.dataset(),
read into a pyarrow Table. Demonstrates that processing through polars allows
table to be rewritten."""
dst = ds.dataset(
base_path,
format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False,
block_size=CHUNK_SIZE)),
partitioning=ds.FilenamePartitioning(
pa.schema([("year", pa.int64())]),
),
)
df = pl.DataFrame(dst.to_table())
ingest_data(name="test_csv_dataset_polars_table", data=df.to_arrow(),
mode="replace")
if __name__ == "__main__":
base_path = Path("test_data").absolute()
base_path.mkdir(exist_ok=True)
gen_mock_data(base_path)
# PASS
test_csv_stream_reader(base_path)
test_csv_table(base_path)
test_csv_dataset_polars_table(base_path)
# FAIL
test_csv_dataset_table(base_path)
test_csv_dataset_batch(base_path)
test_csv_dataset_reader(base_path)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]