DanteOz opened a new issue, #1310: URL: https://github.com/apache/arrow-adbc/issues/1310
I'm running into errors when trying to bulk load data into postgres using `adbc_driver_postgres` with a `pyarrow.dataset`. The dataset is composed of partitioned csv files. I have verified that using `pyarrow.csv.open_csv()` works with `adbc_ingest()`. Additionally it appears loading the dataset into a `pyarrow.Table`, passing it to polars, and calling `df.to_arrow()` works with `adbc_ingest()`. The following is a script to reproduce the errors I have been getting: ## Python version 3.11.6 ## Dependancies ``` polars==0.19.14 pyarrow==14.0.1 adbc_driver_postgresql==0.8.0 ``` ## MRE ```python from pathlib import Path from random import randint import polars as pl import pyarrow as pa import pyarrow.csv as csv import pyarrow.dataset as ds from adbc_driver_postgresql import dbapi # ENTER POSTGRES CONNECTION URI CONNECTION_URI = "" CHUNK_SIZE = 1e9 def gen_mock_data(base_path, n_rows=100, n_cols=10, n_years=10): for y in range(n_years): partion_df = pl.DataFrame( [{f"col_{i}": randint(0, 100) for i in range(n_cols)} for j in range(n_rows)] ) partion_df.write_csv(str(base_path / f"{2000+y}_data.csv")) def ingest_data(name, data, mode): with dbapi.connect(uri=CONNECTION_URI) as conn: with conn.cursor() as cursor: cursor.adbc_ingest( db_schema_name="public", table_name=name, data=data, mode=mode, ) conn.commit() def test_csv_stream_reader(base_path): """Test adbc_ingest() with csv files using pyarrow.csv.open_csv().""" for path in base_path.iterdir(): reader = csv.open_csv( str(path), parse_options=csv.ParseOptions(delimiter="|"), read_options=csv.ReadOptions(block_size=CHUNK_SIZE), ) ingest_data(name="test_csv_stream_reader", data=reader, mode="create_append") def test_csv_table(base_path): """Test adbc_ingest() with csv files using pyarrow.csv.open_csv(), read into a pyarrow Table.""" for path in base_path.iterdir(): reader = csv.open_csv( str(path), parse_options=csv.ParseOptions(delimiter="|"), read_options=csv.ReadOptions(block_size=CHUNK_SIZE), ) ingest_data(name="test_csv_table", data=reader.read_all(), mode="create_append") def test_csv_dataset_table(base_path): """Test adbc_ingest() with csv files using pyarrow.dataset.dataset(), read into a pyarrow Table.""" dst = ds.dataset( base_path, format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False, block_size=CHUNK_SIZE)), partitioning=ds.FilenamePartitioning( pa.schema([("year", pa.int64())]), ), ) table = dst.to_table() ingest_data(name="test_csv_dataset_table", data=table, mode="replace") def test_csv_dataset_batch(base_path): """Test adbc_ingest() with csv files using pyarrow.dataset.dataset(), read into a pyarrow RecordBatch.""" dst = ds.dataset( base_path, format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False, block_size=CHUNK_SIZE)), partitioning=ds.FilenamePartitioning( pa.schema([("year", pa.int64())]), ), ) record_batch = dst.to_batches() ingest_data(name="test_csv_dataset_batch", data=record_batch, mode="replace") def test_csv_dataset_reader(base_path): """Test adbc_ingest() with csv files using pyarrow.dataset.dataset(), read into a pyarrow RecordBatchReader.""" dst = ds.dataset( base_path, format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False, block_size=CHUNK_SIZE)), partitioning=ds.FilenamePartitioning( pa.schema([("year", pa.int64())]), ), ) scanner = dst.scanner() ingest_data(name="test_csv_dataset_reader", data=scanner.to_reader(), mode="replace") def test_csv_dataset_polars_table(base_path): """Test adbc_ingest() with csv files using pyarrow.dataset.dataset(), read into a pyarrow Table. Demonstrates that processing through polars allows table to be rewritten.""" dst = ds.dataset( base_path, format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False, block_size=CHUNK_SIZE)), partitioning=ds.FilenamePartitioning( pa.schema([("year", pa.int64())]), ), ) df = pl.DataFrame(dst.to_table()) ingest_data(name="test_csv_dataset_polars_table", data=df.to_arrow(), mode="replace") if __name__ == "__main__": base_path = Path("test_data").absolute() base_path.mkdir(exist_ok=True) gen_mock_data(base_path) # PASS test_csv_stream_reader(base_path) test_csv_table(base_path) test_csv_dataset_polars_table(base_path) # FAIL test_csv_dataset_table(base_path) test_csv_dataset_batch(base_path) test_csv_dataset_reader(base_path) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org