DanteOz opened a new issue, #1310:
URL: https://github.com/apache/arrow-adbc/issues/1310

   I'm running into errors when trying to bulk load data into postgres using 
`adbc_driver_postgres` with a `pyarrow.dataset`. The dataset is composed of 
partitioned csv files. I have verified that using `pyarrow.csv.open_csv()` 
works with `adbc_ingest()`. Additionally it appears loading the dataset into a 
`pyarrow.Table`, passing it to polars, and calling `df.to_arrow()` works with 
`adbc_ingest()`.
   
   The following is a script to reproduce the errors I have been getting:
   
   ## Python version
   3.11.6
   
   ## Dependancies
   ```
   polars==0.19.14 
   pyarrow==14.0.1 
   adbc_driver_postgresql==0.8.0 
   ```
   
   ## MRE
   ```python
   from pathlib import Path
   from random import randint
   
   import polars as pl
   import pyarrow as pa
   import pyarrow.csv as csv
   import pyarrow.dataset as ds
   from adbc_driver_postgresql import dbapi
   
   # ENTER POSTGRES CONNECTION URI
   CONNECTION_URI = ""
   CHUNK_SIZE = 1e9
   
   
   def gen_mock_data(base_path, n_rows=100, n_cols=10, n_years=10):
       for y in range(n_years):
           partion_df = pl.DataFrame(
               [{f"col_{i}": randint(0, 100) for i in range(n_cols)} for j in 
range(n_rows)]
           )
           partion_df.write_csv(str(base_path / f"{2000+y}_data.csv"))
   
   
   def ingest_data(name, data, mode):
       with dbapi.connect(uri=CONNECTION_URI) as conn:
           with conn.cursor() as cursor:
               cursor.adbc_ingest(
                   db_schema_name="public",
                   table_name=name,
                   data=data,
                   mode=mode,
               )
           conn.commit()
   
   
   def test_csv_stream_reader(base_path):
       """Test adbc_ingest() with csv files using pyarrow.csv.open_csv()."""
       for path in base_path.iterdir():
           reader = csv.open_csv(
               str(path),
               parse_options=csv.ParseOptions(delimiter="|"),
               read_options=csv.ReadOptions(block_size=CHUNK_SIZE),
           )
           ingest_data(name="test_csv_stream_reader", data=reader, 
mode="create_append")
   
   
   def test_csv_table(base_path):
       """Test adbc_ingest() with csv files using pyarrow.csv.open_csv(), read 
into a pyarrow Table."""
       for path in base_path.iterdir():
           reader = csv.open_csv(
               str(path),
               parse_options=csv.ParseOptions(delimiter="|"),
               read_options=csv.ReadOptions(block_size=CHUNK_SIZE),
           )
           ingest_data(name="test_csv_table", data=reader.read_all(), 
mode="create_append")
   
   
   def test_csv_dataset_table(base_path):
       """Test adbc_ingest() with csv files using pyarrow.dataset.dataset(), 
read into a pyarrow Table."""
       dst = ds.dataset(
           base_path,
           
format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False, 
block_size=CHUNK_SIZE)),
           partitioning=ds.FilenamePartitioning(
               pa.schema([("year", pa.int64())]),
           ),
       )
       table = dst.to_table()
       ingest_data(name="test_csv_dataset_table", data=table, mode="replace")
   
   
   def test_csv_dataset_batch(base_path):
       """Test adbc_ingest() with csv files using pyarrow.dataset.dataset(), 
read into a pyarrow RecordBatch."""
       dst = ds.dataset(
           base_path,
           
format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False, 
block_size=CHUNK_SIZE)),
           partitioning=ds.FilenamePartitioning(
               pa.schema([("year", pa.int64())]),
           ),
       )
       record_batch = dst.to_batches()
       ingest_data(name="test_csv_dataset_batch", data=record_batch, 
mode="replace")
   
   
   def test_csv_dataset_reader(base_path):
       """Test adbc_ingest() with csv files using pyarrow.dataset.dataset(), 
read into a pyarrow RecordBatchReader."""
       dst = ds.dataset(
           base_path,
           
format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False, 
block_size=CHUNK_SIZE)),
           partitioning=ds.FilenamePartitioning(
               pa.schema([("year", pa.int64())]),
           ),
       )
       scanner = dst.scanner()
       ingest_data(name="test_csv_dataset_reader", data=scanner.to_reader(), 
mode="replace")
   
   
   def test_csv_dataset_polars_table(base_path):
       """Test adbc_ingest() with csv files using pyarrow.dataset.dataset(), 
read into a pyarrow Table. Demonstrates that processing through polars allows 
table to be rewritten."""
       dst = ds.dataset(
           base_path,
           
format=ds.CsvFileFormat(read_options=csv.ReadOptions(use_threads=False, 
block_size=CHUNK_SIZE)),
           partitioning=ds.FilenamePartitioning(
               pa.schema([("year", pa.int64())]),
           ),
       )
       df = pl.DataFrame(dst.to_table())
       ingest_data(name="test_csv_dataset_polars_table", data=df.to_arrow(), 
mode="replace")
   
   
   if __name__ == "__main__":
       base_path = Path("test_data").absolute()
       base_path.mkdir(exist_ok=True)
   
       gen_mock_data(base_path)
   
       # PASS
       test_csv_stream_reader(base_path)
       test_csv_table(base_path)
       test_csv_dataset_polars_table(base_path)
   
       # FAIL
       test_csv_dataset_table(base_path)
       test_csv_dataset_batch(base_path)
       test_csv_dataset_reader(base_path)
   ```
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to