cmsouza opened a new issue #17538:
URL: https://github.com/apache/airflow/issues/17538


   **Apache Airflow version**: 2.1.2
   
   **Apache Airflow Provider versions**
   apache-airflow-providers-celery==2.0.0
   apache-airflow-providers-ftp==2.0.0
   apache-airflow-providers-google==4.0.0
   apache-airflow-providers-imap==2.0.0
   apache-airflow-providers-mysql==2.0.0
   apache-airflow-providers-postgres==2.0.0
   apache-airflow-providers-sqlite==2.0.0
   
   **What happened**:
   
   When trying to export a parquet file to GCS from MySQL containing dates / 
datetimes the following error occurs:
   
   ```
   [2021-08-09 14:30:36,760] {taskinstance.py:1501} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/opt/airflow/env/lib/python3.7/site-packages/airflow/models/taskinstance.py", 
line 1157, in _run_raw_task
       self._prepare_and_execute_task_with_callbacks(context, task)
     File 
"/opt/airflow/env/lib/python3.7/site-packages/airflow/models/taskinstance.py", 
line 1331, in _prepare_and_execute_task_with_callbacks
       result = self._execute_task(context, task_copy)
     File 
"/opt/airflow/env/lib/python3.7/site-packages/airflow/models/taskinstance.py", 
line 1361, in _execute_task
       result = task_copy.execute(context=context)
     File 
"/opt/airflow/env/lib/python3.7/site-packages/airflow/providers/google/cloud/transfers/sql_to_gcs.py",
 line 154, in execute
       files_to_upload = self._write_local_data_files(cursor)
     File 
"/opt/airflow/env/lib/python3.7/site-packages/airflow/providers/google/cloud/transfers/sql_to_gcs.py",
 line 223, in _write_local_data_files
       tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
     File "pyarrow/table.pxi", line 1576, in pyarrow.lib.Table.from_pydict
     File "pyarrow/array.pxi", line 331, in pyarrow.lib.asarray
     File "pyarrow/array.pxi", line 305, in pyarrow.lib.array
     File "pyarrow/array.pxi", line 39, in pyarrow.lib._sequence_to_array
     File "pyarrow/error.pxi", line 122, in 
pyarrow.lib.pyarrow_internal_check_status
   TypeError: an integer is required (got type str)
   ```
   
   Which is a result of the following code casting de data to string before 
storing the row:
   
   ```python
   # airflow/providers/google/cloud/transfers/sql_to_gcs.py@210
       for row in cursor:
         # Convert datetime objects to utc seconds, and decimals to floats.
         # Convert binary type object to string encoded with base64.
         row = self.convert_types(schema, col_type_dict, row)
   
   # which calls MySQLtoGCSOperator::convert_type
   # airflow/providers/google/cloud/transfers/mysql_to_gcs.py@120
     if isinstance(value, datetime):
       value = str(value)
     elif isinstance(value, timedelta):
       value = str((datetime.min + value).time())
   ```
   This is fine for string based formats such as CSV/JSON but pyarrow should 
receive the unconverted datetime object to be able to store the row correctly
   
   I think the easiest fix is just to not convert the row before passing it to 
pyarrow, like this:
   ```diff
   diff --git a/airflow/providers/google/cloud/transfers/sql_to_gcs.py 
b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
   index 091c22f01..85cc3c98f 100644
   --- a/airflow/providers/google/cloud/transfers/sql_to_gcs.py
   +++ b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
   @@ -210,12 +210,12 @@ class BaseSQLToGCSOperator(BaseOperator):
            for row in cursor:
                # Convert datetime objects to utc seconds, and decimals to 
floats.
                # Convert binary type object to string encoded with base64.
   -            row = self.convert_types(schema, col_type_dict, row)
   +            str_row = self.convert_types(schema, col_type_dict, row)
   
                if self.export_format == 'csv':
                    if self.null_marker is not None:
   -                    row = [value if value is not None else self.null_marker 
for value in row]
   -                csv_writer.writerow(row)
   +                    str_row = [value if value is not None else 
self.null_marker for value in str_row]
   +                csv_writer.writerow(str_row)
                elif self.export_format == 'parquet':
                    if self.null_marker is not None:
                        row = [value if value is not None else self.null_marker 
for value in row]
   @@ -223,7 +223,7 @@ class BaseSQLToGCSOperator(BaseOperator):
                    tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
                    parquet_writer.write_table(tbl)
                else:
   -                row_dict = dict(zip(schema, row))
   +                row_dict = dict(zip(schema, str_row))
   
                    tmp_file_handle.write(
                        json.dumps(row_dict, sort_keys=True, 
ensure_ascii=False).encode("utf-8")
   ```
   Although I'm not sure if it will cause any regression errors
   
   **What you expected to happen**:
   
   The parquet file should be written to disk and uploaded to GCS
   
   **How to reproduce it**:
   The following code is able to reproduce it 100% of the times, it's a 
simplified version of the code on the actual Operator:
   This fails for the exact same reason the Operator fails:
   
   ```python
   from datetime import datetime
   import pyarrow.parquet as pq
   
   f = open('test.parquet', 'w')
   c.execute(a)
   
   parquet_schema = pa.schema([
       ('id', pa.int64()),
       ('date_added', pa.timestamp('s')),
   ])
       
   parquet_writer = pq.ParquetWriter(f.name, parquet_schema)
   
   row_pydic = {
    'id': [26417],
    'date_added': ['2021-08-10 11:04:06'],
   }
   
   tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
   parquet_writer.write_table(tbl)
   
   parquet_writer.close()
   ```
   
   And returns the following error:
   
   ```
   TypeError                                 Traceback (most recent call last)
   /tmp/ipykernel_377852/1020037443.py in <module>
        17 }
        18 
   ---> 19 tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
        20 parquet_writer.write_table(tbl)
        21 
   
   ~/export/env/lib/python3.9/site-packages/pyarrow/table.pxi in 
pyarrow.lib.Table.from_pydict()
   ~/export/env/lib/python3.9/site-packages/pyarrow/array.pxi in 
pyarrow.lib.asarray()
   ~/export/env/lib/python3.9/site-packages/pyarrow/array.pxi in 
pyarrow.lib.array()
   ~/export/env/lib/python3.9/site-packages/pyarrow/array.pxi in 
pyarrow.lib._sequence_to_array()
   ~/export/env/lib/python3.9/site-packages/pyarrow/error.pxi in 
pyarrow.lib.pyarrow_internal_check_status()
   TypeError: an integer is required (got type str)
   ```
   
   While this works:
   
   ```python
   from datetime import datetime
   import pyarrow.parquet as pq
   
   f = open('test.parquet', 'w')
   c.execute(a)
   
   parquet_schema = pa.schema([
       ('id', pa.int64()),
       ('date_added', pa.timestamp('s')),
   ])
       
   parquet_writer = pq.ParquetWriter(f.name, parquet_schema)
   
   row_pydic = {
    'id': [26417],
    'date_added': [datetime(2021, 8, 10, 11, 4, 6)],
   }
   
   tbl = pa.Table.from_pydict(row_pydic, parquet_schema)
   parquet_writer.write_table(tbl)
   
   parquet_writer.close()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to