kcullimore opened a new issue, #44683:
URL: https://github.com/apache/arrow/issues/44683

   ### Describe the enhancement requested
   
   When uploading a Parquet file created with PyArrow to Google BigQuery, 
columns containing lists (e.g., List[str], List[int], List[float]) are 
interpreted by BigQuery as RECORD types with REPEATED mode instead of the 
expected primitive types (STRING, INTEGER, FLOAT) with REPEATED mode. 
   
   I was expecting BigQuery to recognize the `int_column`, `str_column`, and 
`float_column` as arrays of integers, strings, and floats respectively (with 
REPEATED mode). BigQuery seems to interpret these columns as RECORD types with 
REPEATED mode instead of being treated as simple repeated fields which further 
complicates the data handling. 
   
   I’ve tried explicitly defining the schema in BigQuery and ensuring that the 
Parquet file’s schema matches what I expect but the behavior persists. I’m not 
sure if this is expected or if there’s something I’m missing?
   
   I have an alternative workaround in mind (via JSON) but would prefer to 
continue using PyArrow and parquet. 
   
   
   ## Example 
   
   To reproduce create a Parquet file using PyArrow that includes some columns 
with lists of integers, strings, and floats. Upload this Parquet file to 
BigQuery via a bucket and inspect the table schema and field values.
   
   ```python
   import os
   import pyarrow as pa
   import pyarrow.parquet as pq
   from google.cloud import bigquery, storage
   
   # file
   sample_filename = 'sample_data.parquet'
   sample_filepath = f'{sample_filename}'
   
   # Create mock data
   data = {
       'id': [0, 1, 2, 3],
       'int_column': [[1], [2, 2], [], [999]],
       'str_column': [['a', 'aa'], ['b'], [], ['alpha']],
       'float_column': [[1.1], [2.2, 3.30], [], [9.029]]
   }
   schema = pa.schema([
       pa.field('id', pa.int64()),
       pa.field('int_column', pa.list_(pa.int64())),
       pa.field('str_column', pa.list_(pa.string())),
       pa.field('float_column', pa.list_(pa.float64())),
   ])
   table = pa.Table.from_pydict(data, schema=schema)
   print(table.schema)
   """
   id: int64
   int_column: list<item: int64>
     child 0, item: int64
   str_column: list<item: string>
     child 0, item: string
   float_column: list<item: double>
     child 0, item: double
   """
   
   # Write and read from parquet file
   pq.write_table(table, sample_filepath)
   imported_table = pq.read_table(sample_filepath)
   print(imported_table.schema)
   
   # Upload to bucket
   bucket_name = 'bucket_name'
   blob_uri = f'gs://{bucket_name}/{sample_filename}'
   storage_client = storage.Client()
   bucket = storage_client.bucket(bucket_name)
   blob = bucket.blob(sample_filename)
   blob.upload_from_filename(sample_filepath)
   
   # Upload to BigQuery table
   dataset_id = 'dataset_id'
   table_id = 'table_id'
   bq_client = bigquery.Client()
   table_ref = bq_client.dataset(dataset_id).table(table_id)
   job_config = bigquery.LoadJobConfig(
       source_format=bigquery.SourceFormat.PARQUET,
       write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
   )
   load_job = bq_client.load_table_from_uri(
       blob_uri,
       table_ref,
       job_config=job_config,
   )
   load_job.result()
   
   # Review BQ table schema
   loaded_table = bq_client.get_table(table_ref)
   print(loaded_table.schema)
   """
   [SchemaField('id', 'INTEGER', 'NULLABLE', None, None, (), None), 
SchemaField('int_column', 'RECORD', 'NULLABLE', None, None, 
(SchemaField('list', 'RECORD', 'REPEATED', None, None, (SchemaField('element', 
'INTEGER', 'NULLABLE', None, None, (), None),), None),), None), 
SchemaField('str_column', 'RECORD', 'NULLABLE', None, None, 
(SchemaField('list', 'RECORD', 'REPEATED', None, None, (SchemaField('element', 
'STRING', 'NULLABLE', None, None, (), None),), None),), None), 
SchemaField('float_column', 'RECORD', 'NULLABLE', None, None, 
(SchemaField('list', 'RECORD', 'REPEATED', None, None, (SchemaField('element', 
'FLOAT', 'NULLABLE', None, None, (), None),), None),), None)]
   """
   
   # Review BQ table data
   query = f'SELECT * FROM `{dataset_id}.{table_id}`'
   query_job = bq_client.query(query)
   bq_table = query_job.result().to_arrow()
   print(bq_table.shema)
   """
   id: int64
   int_column: struct<list: list<item: struct<element: int64>> not null>
     child 0, list: list<item: struct<element: int64>> not null
         child 0, item: struct<element: int64>
             child 0, element: int64
   str_column: struct<list: list<item: struct<element: string>> not null>
     child 0, list: list<item: struct<element: string>> not null
         child 0, item: struct<element: string>
             child 0, element: string
   float_column: struct<list: list<item: struct<element: double>> not null>
     child 0, list: list<item: struct<element: double>> not null
         child 0, item: struct<element: double>
             child 0, element: double
   """
   
   # Optional job_config to verify enforcing schema does not help
   bq_schema = [
       bigquery.SchemaField('id', 'INTEGER', mode='NULLABLE'),
       bigquery.SchemaField('int_column', 'STRING', mode='REPEATED'),
       bigquery.SchemaField('str_column', 'STRING', mode='REPEATED'),
       bigquery.SchemaField('float_column', 'FLOAT', mode='REPEATED'),
   ]
   job_config = bigquery.LoadJobConfig(
       source_format=bigquery.SourceFormat.PARQUET,
       write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
       schema=bq_schema,
       autodetect=False,
   )
   load_job = bq_client.load_table_from_uri(
       blob_uri,
       table_ref,
       job_config=job_config,
   )
   load_job.result()
   loaded_table = bq_client.get_table(table_ref)
   print(loaded_table.schema)
   
   # Clear data
   os.remove(sample_filepath)
   blob.delete()
   bq_client.delete_table(table_ref)
   ```
   
   Environment:
   •    Python 3.11.10
   •    Ubuntu 22.04.5
   •    pyarrow==18.0.0
   •    google-cloud-bigquery==3.26.0
   •    google-cloud-storage==2.18.2
   
   
   ### Component(s)
   
   Python


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to